conf.py 8.5 KB


  1. import logging
  2. import warnings
  3. from datetime import timedelta
  4. from celery.loaders import load_settings
  5. DEFAULT_PROCESS_LOG_FMT = """
  6. [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
  7. """.strip()
  8. DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
  9. DEFAULT_TASK_LOG_FMT = " ".join("""
  10. [%(asctime)s: %(levelname)s/%(processName)s]
  11. [%(task_name)s(%(task_id)s)] %(message)s
  12. """.strip().split())
  13. LOG_LEVELS = dict(logging._levelNames)
  14. LOG_LEVELS["FATAL"] = logging.FATAL
  15. LOG_LEVELS[logging.FATAL] = "FATAL"
  16. settings = load_settings()
  17. _DEFAULTS = {
  18. "CELERY_RESULT_BACKEND": "database",
  19. "CELERY_ALWAYS_EAGER": False,
  20. "CELERY_TASK_RESULT_EXPIRES": timedelta(days=5),
  21. "CELERY_SEND_EVENTS": False,
  22. "CELERY_IGNORE_RESULT": False,
  23. "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
  24. "CELERY_TASK_SERIALIZER": "pickle",
  25. "CELERY_DISABLE_RATE_LIMITS": False,
  26. "CELERY_DEFAULT_ROUTING_KEY": "celery",
  27. "CELERY_DEFAULT_QUEUE": "celery",
  28. "CELERY_DEFAULT_EXCHANGE": "celery",
  29. "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
  30. "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
  31. "CELERY_BROKER_CONNECTION_TIMEOUT": 4,
  32. "CELERY_BROKER_CONNECTION_RETRY": True,
  33. "CELERY_BROKER_CONNECTION_MAX_RETRIES": 100,
  34. "CELERY_ACKS_LATE": False,
  35. "CELERYD_POOL": "celery.worker.pool.TaskPool",
  36. "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
  37. "CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
  38. "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
  39. "CELERYD_CONCURRENCY": 0, # defaults to cpu count
  40. "CELERYD_PREFETCH_MULTIPLIER": 4,
  41. "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
  42. "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
  43. "CELERYD_LOG_LEVEL": "WARN",
  44. "CELERYD_LOG_FILE": None, # stderr
  45. "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
  46. "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
  47. "CELERYBEAT_LOG_LEVEL": "INFO",
  48. "CELERYBEAT_LOG_FILE": None, # stderr
  49. "CELERYMON_LOG_LEVEL": "INFO",
  50. "CELERYMON_LOG_FILE": None, # stderr
  51. "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
  52. "CELERY_BROADCAST_QUEUE": "celeryctl",
  53. "CELERY_BROADCAST_EXCHANGE": "celeryctl",
  54. "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
  55. "CELERY_EVENT_QUEUE": "celeryevent",
  56. "CELERY_EVENT_EXCHANGE": "celeryevent",
  57. "CELERY_EVENT_EXCHANGE_TYPE": "direct",
  58. "CELERY_EVENT_ROUTING_KEY": "celeryevent",
  59. "CELERY_EVENT_SERIALIZER": "json",
  60. "CELERY_RESULT_EXCHANGE": "celeryresults",
  61. "CELERY_RESULT_EXCHANGE_TYPE": "direct",
  62. "CELERY_RESULT_SERIALIZER": "pickle",
  63. "CELERY_RESULT_PERSISTENT": False,
  64. "CELERY_MAX_CACHED_RESULTS": 5000,
  65. "CELERY_TRACK_STARTED": False,
  66. }
  67. _DEPRECATION_FMT = """
  68. %s is deprecated in favor of %s and is scheduled for removal in celery v1.4.
  69. """.strip()
  70. def _get(name, default=None, compat=None):
  71. compat = compat or []
  72. if default is None:
  73. default = _DEFAULTS.get(name)
  74. compat = [name] + compat
  75. for i, alias in enumerate(compat):
  76. try:
  77. value = getattr(settings, alias)
  78. i > 0 and warnings.warn(DeprecationWarning(_DEPRECATION_FMT % (
  79. alias, name)))
  80. return value
  81. except AttributeError:
  82. pass
  83. return default
  84. # <--- Task <- -- --- - ----- -- #
  85. ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
  86. RESULT_BACKEND = _get("CELERY_RESULT_BACKEND", compat=["CELERY_BACKEND"])
  87. CELERY_BACKEND = RESULT_BACKEND # FIXME Remove in 1.4
  88. CELERY_CACHE_BACKEND = _get("CELERY_CACHE_BACKEND")
  89. TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
  90. TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
  91. IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
  92. TRACK_STARTED = _get("CELERY_TRACK_STARTED")
  93. ACKS_LATE = _get("CELERY_ACKS_LATE")
  94. # Make sure TASK_RESULT_EXPIRES is a timedelta.
  95. if isinstance(TASK_RESULT_EXPIRES, int):
  96. TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)
  97. # <--- SQLAlchemy <- -- --- - ----- -- #
  98. RESULT_DBURI = _get("CELERY_RESULT_DBURI")
  99. RESULT_ENGINE_OPTIONS = _get("CELERY_RESULT_ENGINE_OPTIONS")
  100. # <--- Client <- -- --- - ----- -- #
  101. MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
  102. # <--- Worker <- -- --- - ----- -- #
  103. SEND_EVENTS = _get("CELERY_SEND_EVENTS")
  104. DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
  105. DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
  106. CELERYD_TASK_TIME_LIMIT = _get("CELERYD_TASK_TIME_LIMIT")
  107. CELERYD_TASK_SOFT_TIME_LIMIT = _get("CELERYD_TASK_SOFT_TIME_LIMIT")
  108. STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
  109. CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
  110. not settings.DEBUG,
  111. compat=["SEND_CELERY_TASK_ERROR_EMAILS"])
  112. CELERYD_LOG_FORMAT = _get("CELERYD_LOG_FORMAT",
  113. compat=["CELERYD_DAEMON_LOG_FORMAT"])
  114. CELERYD_TASK_LOG_FORMAT = _get("CELERYD_TASK_LOG_FORMAT")
  115. CELERYD_LOG_FILE = _get("CELERYD_LOG_FILE")
  116. CELERYD_LOG_LEVEL = _get("CELERYD_LOG_LEVEL",
  117. compat=["CELERYD_DAEMON_LOG_LEVEL"])
  118. CELERYD_LOG_LEVEL = LOG_LEVELS[CELERYD_LOG_LEVEL.upper()]
  119. CELERYD_CONCURRENCY = _get("CELERYD_CONCURRENCY")
  120. CELERYD_PREFETCH_MULTIPLIER = _get("CELERYD_PREFETCH_MULTIPLIER")
  121. CELERYD_POOL = _get("CELERYD_POOL")
  122. CELERYD_LISTENER = _get("CELERYD_LISTENER")
  123. CELERYD_MEDIATOR = _get("CELERYD_MEDIATOR")
  124. CELERYD_ETA_SCHEDULER = _get("CELERYD_ETA_SCHEDULER")
  125. # <--- Message routing <- -- --- - ----- -- #
  126. DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
  127. DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
  128. DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
  129. DEFAULT_EXCHANGE_TYPE = _get("CELERY_DEFAULT_EXCHANGE_TYPE")
  130. DEFAULT_DELIVERY_MODE = _get("CELERY_DEFAULT_DELIVERY_MODE")
  131. QUEUES = _get("CELERY_QUEUES") or {DEFAULT_QUEUE: {
  132. "exchange": DEFAULT_EXCHANGE,
  133. "exchange_type": DEFAULT_EXCHANGE_TYPE,
  134. "binding_key": DEFAULT_ROUTING_KEY}}
  135. # :--- Broadcast queue settings <- -- --- - ----- -- #
  136. BROADCAST_QUEUE = _get("CELERY_BROADCAST_QUEUE")
  137. BROADCAST_EXCHANGE = _get("CELERY_BROADCAST_EXCHANGE")
  138. BROADCAST_EXCHANGE_TYPE = _get("CELERY_BROADCAST_EXCHANGE_TYPE")
  139. # :--- Event queue settings <- -- --- - ----- -- #
  140. EVENT_QUEUE = _get("CELERY_EVENT_QUEUE")
  141. EVENT_EXCHANGE = _get("CELERY_EVENT_EXCHANGE")
  142. EVENT_EXCHANGE_TYPE = _get("CELERY_EVENT_EXCHANGE_TYPE")
  143. EVENT_ROUTING_KEY = _get("CELERY_EVENT_ROUTING_KEY")
  144. EVENT_SERIALIZER = _get("CELERY_EVENT_SERIALIZER")
  145. # :--- Broker connections <- -- --- - ----- -- #
  146. BROKER_CONNECTION_TIMEOUT = _get("CELERY_BROKER_CONNECTION_TIMEOUT",
  147. compat=["CELERY_AMQP_CONNECTION_TIMEOUT"])
  148. BROKER_CONNECTION_RETRY = _get("CELERY_BROKER_CONNECTION_RETRY",
  149. compat=["CELERY_AMQP_CONNECTION_RETRY"])
  150. BROKER_CONNECTION_MAX_RETRIES = _get("CELERY_BROKER_CONNECTION_MAX_RETRIES",
  151. compat=["CELERY_AMQP_CONNECTION_MAX_RETRIES"])
  152. # :--- AMQP Backend settings <- -- --- - ----- -- #
  153. RESULT_EXCHANGE = _get("CELERY_RESULT_EXCHANGE")
  154. RESULT_EXCHANGE_TYPE = _get("CELERY_RESULT_EXCHANGE_TYPE")
  155. RESULT_SERIALIZER = _get("CELERY_RESULT_SERIALIZER")
  156. RESULT_PERSISTENT = _get("CELERY_RESULT_PERSISTENT")
  157. # :--- Celery Beat <- -- --- - ----- -- #
  158. CELERYBEAT_LOG_LEVEL = _get("CELERYBEAT_LOG_LEVEL")
  159. CELERYBEAT_LOG_FILE = _get("CELERYBEAT_LOG_FILE")
  160. CELERYBEAT_SCHEDULE_FILENAME = _get("CELERYBEAT_SCHEDULE_FILENAME")
  161. CELERYBEAT_MAX_LOOP_INTERVAL = _get("CELERYBEAT_MAX_LOOP_INTERVAL")
  162. # :--- Celery Monitor <- -- --- - ----- -- #
  163. CELERYMON_LOG_LEVEL = _get("CELERYMON_LOG_LEVEL")
  164. CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")
  165. def _init_routing_table(queues):
  166. """Convert configuration mapping to a table of queues digestible
  167. by a :class:`carrot.messaging.ConsumerSet`."""
  168. def _defaults(opts):
  169. opts.setdefault("exchange", DEFAULT_EXCHANGE),
  170. opts.setdefault("exchange_type", DEFAULT_EXCHANGE_TYPE)
  171. opts.setdefault("binding_key", DEFAULT_EXCHANGE)
  172. return opts
  173. return dict((queue, _defaults(opts)) for queue, opts in queues.items())
  174. routing_table = _init_routing_table(QUEUES)