conf.py 11 KB


  1. import sys
  2. import logging
  3. import warnings
  4. from datetime import timedelta
  5. from celery import routes
  6. from celery.loaders import load_settings
  7. DEFAULT_PROCESS_LOG_FMT = """
  8. [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
  9. """.strip()
  10. DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
  11. DEFAULT_TASK_LOG_FMT = " ".join("""
  12. [%(asctime)s: %(levelname)s/%(processName)s]
  13. [%(task_name)s(%(task_id)s)] %(message)s
  14. """.strip().split())
  15. LOG_LEVELS = dict(logging._levelNames)
  16. LOG_LEVELS["FATAL"] = logging.FATAL
  17. LOG_LEVELS[logging.FATAL] = "FATAL"
  18. settings = load_settings()
  19. _DEFAULTS = {
  20. "BROKER_CONNECTION_TIMEOUT": 4,
  21. "BROKER_CONNECTION_RETRY": True,
  22. "BROKER_CONNECTION_MAX_RETRIES": 100,
  23. "BROKER_HOST": "localhost",
  24. "BROKER_PORT": None,
  25. "BROKER_USER": "guest",
  26. "BROKER_PASSWORD": "guest",
  27. "BROKER_VHOST": "/",
  28. "CELERY_RESULT_BACKEND": "database",
  29. "CELERY_ALWAYS_EAGER": False,
  30. "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
  31. "CELERY_TASK_RESULT_EXPIRES": timedelta(days=1),
  32. "CELERY_SEND_EVENTS": False,
  33. "CELERY_IGNORE_RESULT": False,
  34. "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
  35. "CELERY_TASK_SERIALIZER": "pickle",
  36. "CELERY_DISABLE_RATE_LIMITS": False,
  37. "CELERYD_TASK_TIME_LIMIT": None,
  38. "CELERYD_TASK_SOFT_TIME_LIMIT": None,
  39. "CELERYD_MAX_TASKS_PER_CHILD": None,
  40. "CELERY_ROUTES": None,
  41. "CELERY_CREATE_MISSING_QUEUES": True,
  42. "CELERY_DEFAULT_ROUTING_KEY": "celery",
  43. "CELERY_DEFAULT_QUEUE": "celery",
  44. "CELERY_DEFAULT_EXCHANGE": "celery",
  45. "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
  46. "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
  47. "CELERY_ACKS_LATE": False,
  48. "CELERYD_POOL_PUTLOCKS": True,
  49. "CELERYD_POOL": "celery.concurrency.processes.TaskPool",
  50. "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
  51. "CELERYD_ETA_SCHEDULER": "timer2.Timer",
  52. "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
  53. "CELERYD_CONCURRENCY": 0, # defaults to cpu count
  54. "CELERYD_PREFETCH_MULTIPLIER": 4,
  55. "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
  56. "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
  57. "CELERYD_LOG_COLOR": False,
  58. "CELERYD_LOG_LEVEL": "WARN",
  59. "CELERYD_LOG_FILE": None, # stderr
  60. "CELERYBEAT_SCHEDULE": {},
  61. "CELERYD_STATE_DB": None,
  62. "CELERYD_ETA_SCHEDULER_PRECISION": 1,
  63. "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
  64. "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
  65. "CELERYBEAT_LOG_LEVEL": "INFO",
  66. "CELERYBEAT_LOG_FILE": None, # stderr
  67. "CELERYMON_LOG_LEVEL": "INFO",
  68. "CELERYMON_LOG_FILE": None, # stderr
  69. "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
  70. "CELERY_BROADCAST_QUEUE": "celeryctl",
  71. "CELERY_BROADCAST_EXCHANGE": "celeryctl",
  72. "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
  73. "CELERY_EVENT_QUEUE": "celeryevent",
  74. "CELERY_EVENT_EXCHANGE": "celeryevent",
  75. "CELERY_EVENT_EXCHANGE_TYPE": "direct",
  76. "CELERY_EVENT_ROUTING_KEY": "celeryevent",
  77. "CELERY_EVENT_SERIALIZER": "json",
  78. "CELERY_RESULT_EXCHANGE": "celeryresults",
  79. "CELERY_RESULT_EXCHANGE_TYPE": "direct",
  80. "CELERY_RESULT_SERIALIZER": "pickle",
  81. "CELERY_RESULT_PERSISTENT": False,
  82. "CELERY_MAX_CACHED_RESULTS": 5000,
  83. "CELERY_TRACK_STARTED": False,
  84. # Default e-mail settings.
  85. "SERVER_EMAIL": "celery@localhost",
  86. "EMAIL_HOST": "localhost",
  87. "EMAIL_PORT": 25,
  88. "ADMINS": (),
  89. }
  90. def isatty(fh):
  91. # Fixes bug with mod_wsgi:
  92. # mod_wsgi.Log object has no attribute isatty.
  93. return getattr(fh, "isatty", None) and fh.isatty()
  94. _DEPRECATION_FMT = """
  95. %s is deprecated in favor of %s and is scheduled for removal in celery v1.4.
  96. """.strip()
  97. def _get(name, default=None, compat=None):
  98. compat = compat or []
  99. if default is None:
  100. default = _DEFAULTS.get(name)
  101. compat = [name] + compat
  102. for i, alias in enumerate(compat):
  103. try:
  104. value = getattr(settings, alias)
  105. i > 0 and warnings.warn(DeprecationWarning(_DEPRECATION_FMT % (
  106. alias, name)))
  107. return value
  108. except AttributeError:
  109. pass
  110. return default
  111. # <--- Task <- -- --- - ----- -- #
  112. ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
  113. EAGER_PROPAGATES_EXCEPTIONS = _get("CELERY_EAGER_PROPAGATES_EXCEPTIONS")
  114. RESULT_BACKEND = _get("CELERY_RESULT_BACKEND", compat=["CELERY_BACKEND"])
  115. CELERY_BACKEND = RESULT_BACKEND # FIXME Remove in 1.4
  116. CACHE_BACKEND = _get("CELERY_CACHE_BACKEND") or _get("CACHE_BACKEND")
  117. CACHE_BACKEND_OPTIONS = _get("CELERY_CACHE_BACKEND_OPTIONS") or {}
  118. TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
  119. TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
  120. IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
  121. TRACK_STARTED = _get("CELERY_TRACK_STARTED")
  122. ACKS_LATE = _get("CELERY_ACKS_LATE")
  123. # Make sure TASK_RESULT_EXPIRES is a timedelta.
  124. if isinstance(TASK_RESULT_EXPIRES, int):
  125. TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)
  126. # <--- SQLAlchemy <- -- --- - ----- -- #
  127. RESULT_DBURI = _get("CELERY_RESULT_DBURI")
  128. RESULT_ENGINE_OPTIONS = _get("CELERY_RESULT_ENGINE_OPTIONS")
  129. # <--- Client <- -- --- - ----- -- #
  130. MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
  131. # <--- Worker <- -- --- - ----- -- #
  132. SEND_EVENTS = _get("CELERY_SEND_EVENTS")
  133. DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
  134. DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
  135. CELERYD_TASK_TIME_LIMIT = _get("CELERYD_TASK_TIME_LIMIT")
  136. CELERYD_TASK_SOFT_TIME_LIMIT = _get("CELERYD_TASK_SOFT_TIME_LIMIT")
  137. CELERYD_MAX_TASKS_PER_CHILD = _get("CELERYD_MAX_TASKS_PER_CHILD")
  138. STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
  139. CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS", False,
  140. compat=["SEND_CELERY_TASK_ERROR_EMAILS"])
  141. CELERY_TASK_ERROR_WHITELIST = _get("CELERY_TASK_ERROR_WHITELIST")
  142. CELERYD_LOG_FORMAT = _get("CELERYD_LOG_FORMAT",
  143. compat=["CELERYD_DAEMON_LOG_FORMAT"])
  144. CELERYD_TASK_LOG_FORMAT = _get("CELERYD_TASK_LOG_FORMAT")
  145. CELERYD_LOG_FILE = _get("CELERYD_LOG_FILE")
  146. CELERYD_LOG_COLOR = _get("CELERYD_LOG_COLOR",
  147. CELERYD_LOG_FILE is None and isatty(sys.stderr))
  148. CELERYD_LOG_LEVEL = _get("CELERYD_LOG_LEVEL",
  149. compat=["CELERYD_DAEMON_LOG_LEVEL"])
  150. CELERYD_LOG_LEVEL = LOG_LEVELS[CELERYD_LOG_LEVEL.upper()]
  151. CELERYD_STATE_DB = _get("CELERYD_STATE_DB")
  152. CELERYD_CONCURRENCY = _get("CELERYD_CONCURRENCY")
  153. CELERYD_PREFETCH_MULTIPLIER = _get("CELERYD_PREFETCH_MULTIPLIER")
  154. CELERYD_POOL_PUTLOCKS = _get("CELERYD_POOL_PUTLOCKS")
  155. CELERYD_POOL = _get("CELERYD_POOL")
  156. CELERYD_LISTENER = _get("CELERYD_LISTENER")
  157. CELERYD_MEDIATOR = _get("CELERYD_MEDIATOR")
  158. CELERYD_ETA_SCHEDULER = _get("CELERYD_ETA_SCHEDULER")
  159. CELERYD_ETA_SCHEDULER_PRECISION = _get("CELERYD_ETA_SCHEDULER_PRECISION")
  160. # :--- Email settings <- -- --- - ----- -- #
  161. ADMINS = _get("ADMINS")
  162. SERVER_EMAIL = _get("SERVER_EMAIL")
  163. EMAIL_HOST = _get("EMAIL_HOST")
  164. EMAIL_HOST_USER = _get("EMAIL_HOST_USER")
  165. EMAIL_HOST_PASSWORD = _get("EMAIL_HOST_PASSWORD")
  166. EMAIL_PORT = _get("EMAIL_PORT")
  167. # :--- Broker connections <- -- --- - ----- -- #
  168. BROKER_HOST = _get("BROKER_HOST")
  169. BROKER_PORT = _get("BROKER_PORT")
  170. BROKER_USER = _get("BROKER_USER")
  171. BROKER_PASSWORD = _get("BROKER_PASSWORD")
  172. BROKER_VHOST = _get("BROKER_VHOST")
  173. BROKER_USE_SSL = _get("BROKER_USE_SSL")
  174. BROKER_INSIST = _get("BROKER_INSIST")
  175. BROKER_CONNECTION_TIMEOUT = _get("BROKER_CONNECTION_TIMEOUT",
  176. compat=["CELERY_BROKER_CONNECTION_TIMEOUT"])
  177. BROKER_CONNECTION_RETRY = _get("BROKER_CONNECTION_RETRY",
  178. compat=["CELERY_BROKER_CONNECTION_RETRY"])
  179. BROKER_CONNECTION_MAX_RETRIES = _get("BROKER_CONNECTION_MAX_RETRIES",
  180. compat=["CELERY_BROKER_CONNECTION_MAX_RETRIES"])
  181. BROKER_BACKEND = _get("BROKER_TRANSPORT") or \
  182. _get("BROKER_BACKEND") or \
  183. _get("CARROT_BACKEND")
  184. # <--- Message routing <- -- --- - ----- -- #
  185. DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
  186. DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
  187. DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
  188. DEFAULT_EXCHANGE_TYPE = _get("CELERY_DEFAULT_EXCHANGE_TYPE")
  189. DEFAULT_DELIVERY_MODE = _get("CELERY_DEFAULT_DELIVERY_MODE")
  190. QUEUES = _get("CELERY_QUEUES") or {DEFAULT_QUEUE: {
  191. "exchange": DEFAULT_EXCHANGE,
  192. "exchange_type": DEFAULT_EXCHANGE_TYPE,
  193. "binding_key": DEFAULT_ROUTING_KEY}}
  194. CREATE_MISSING_QUEUES = _get("CELERY_CREATE_MISSING_QUEUES")
  195. ROUTES = routes.prepare(_get("CELERY_ROUTES") or [])
  196. # :--- Broadcast queue settings <- -- --- - ----- -- #
  197. BROADCAST_QUEUE = _get("CELERY_BROADCAST_QUEUE")
  198. BROADCAST_EXCHANGE = _get("CELERY_BROADCAST_EXCHANGE")
  199. BROADCAST_EXCHANGE_TYPE = _get("CELERY_BROADCAST_EXCHANGE_TYPE")
  200. # :--- Event queue settings <- -- --- - ----- -- #
  201. EVENT_QUEUE = _get("CELERY_EVENT_QUEUE")
  202. EVENT_EXCHANGE = _get("CELERY_EVENT_EXCHANGE")
  203. EVENT_EXCHANGE_TYPE = _get("CELERY_EVENT_EXCHANGE_TYPE")
  204. EVENT_ROUTING_KEY = _get("CELERY_EVENT_ROUTING_KEY")
  205. EVENT_SERIALIZER = _get("CELERY_EVENT_SERIALIZER")
  206. # :--- AMQP Backend settings <- -- --- - ----- -- #
  207. RESULT_EXCHANGE = _get("CELERY_RESULT_EXCHANGE")
  208. RESULT_EXCHANGE_TYPE = _get("CELERY_RESULT_EXCHANGE_TYPE")
  209. RESULT_SERIALIZER = _get("CELERY_RESULT_SERIALIZER")
  210. RESULT_PERSISTENT = _get("CELERY_RESULT_PERSISTENT")
  211. # :--- Celery Beat <- -- --- - ----- -- #
  212. CELERYBEAT_LOG_LEVEL = _get("CELERYBEAT_LOG_LEVEL")
  213. CELERYBEAT_LOG_FILE = _get("CELERYBEAT_LOG_FILE")
  214. CELERYBEAT_SCHEDULE = _get("CELERYBEAT_SCHEDULE")
  215. CELERYBEAT_SCHEDULE_FILENAME = _get("CELERYBEAT_SCHEDULE_FILENAME")
  216. CELERYBEAT_MAX_LOOP_INTERVAL = _get("CELERYBEAT_MAX_LOOP_INTERVAL")
  217. # :--- Celery Monitor <- -- --- - ----- -- #
  218. CELERYMON_LOG_LEVEL = _get("CELERYMON_LOG_LEVEL")
  219. CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")
  220. def _init_queues(queues):
  221. """Convert configuration mapping to a table of queues digestible
  222. by a :class:`carrot.messaging.ConsumerSet`."""
  223. def _defaults(opts):
  224. opts.setdefault("exchange", DEFAULT_EXCHANGE),
  225. opts.setdefault("exchange_type", DEFAULT_EXCHANGE_TYPE)
  226. opts.setdefault("binding_key", DEFAULT_EXCHANGE)
  227. opts.setdefault("routing_key", opts.get("binding_key"))
  228. return opts
  229. return dict((queue, _defaults(opts)) for queue, opts in queues.items())
  230. def get_queues():
  231. return _init_queues(QUEUES)