conf.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. import logging
  2. import warnings
  3. from datetime import timedelta
  4. from celery.loaders import load_settings
  5. DEFAULT_P_LOG_FMT = '[%(asctime)s: %(levelname)s/%(processName)s] %(message)s'
  6. DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
  7. LOG_LEVELS = dict(logging._levelNames)
  8. LOG_LEVELS["FATAL"] = logging.FATAL
  9. LOG_LEVELS[logging.FATAL] = "FATAL"
  10. settings = load_settings()
  11. _DEFAULTS = {
  12. "CELERY_RESULT_BACKEND": "database",
  13. "CELERY_ALWAYS_EAGER": False,
  14. "CELERY_TASK_RESULT_EXPIRES": timedelta(days=5),
  15. "CELERY_SEND_EVENTS": False,
  16. "CELERY_IGNORE_RESULT": False,
  17. "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
  18. "CELERY_TASK_SERIALIZER": "pickle",
  19. "CELERY_DISABLE_RATE_LIMITS": False,
  20. "CELERY_DEFAULT_ROUTING_KEY": "celery",
  21. "CELERY_DEFAULT_QUEUE": "celery",
  22. "CELERY_DEFAULT_EXCHANGE": "celery",
  23. "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
  24. "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
  25. "CELERY_BROKER_CONNECTION_TIMEOUT": 4,
  26. "CELERY_BROKER_CONNECTION_RETRY": True,
  27. "CELERY_BROKER_CONNECTION_MAX_RETRIES": 100,
  28. "CELERYD_CONCURRENCY": 0, # defaults to cpu count
  29. "CELERYD_PREFETCH_MULTIPLIER": 4,
  30. "CELERYD_LOG_FORMAT": DEFAULT_P_LOG_FMT,
  31. "CELERYD_LOG_LEVEL": "WARN",
  32. "CELERYD_LOG_FILE": None, # stderr
  33. "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
  34. "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
  35. "CELERYBEAT_LOG_LEVEL": "INFO",
  36. "CELERYBEAT_LOG_FILE": None, # stderr
  37. "CELERYMON_LOG_LEVEL": "INFO",
  38. "CELERYMON_LOG_FILE": None, # stderr
  39. "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
  40. "CELERY_BROADCAST_QUEUE": "celeryctl",
  41. "CELERY_BROADCAST_EXCHANGE": "celeryctl",
  42. "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
  43. "CELERY_EVENT_QUEUE": "celeryevent",
  44. "CELERY_EVENT_EXCHANGE": "celeryevent",
  45. "CELERY_EVENT_EXCHANGE_TYPE": "direct",
  46. "CELERY_EVENT_ROUTING_KEY": "celeryevent",
  47. "CELERY_RESULT_EXCHANGE": "celeryresults",
  48. "CELERY_MAX_CACHED_RESULTS": 5000,
  49. }
  50. _DEPRECATION_FMT = """
  51. %s is deprecated in favor of %s and is schedule for removal in celery v1.2.
  52. """.strip()
  53. def _get(name, default=None, compat=None):
  54. compat = compat or []
  55. if default is None:
  56. default = _DEFAULTS.get(name)
  57. compat = [name] + compat
  58. for i, alias in enumerate(compat):
  59. try:
  60. value = getattr(settings, name)
  61. i > 0 and warnings.warn(DeprecationWarning(_DEPRECATION_FMT % (
  62. alias, name)))
  63. return value
  64. except AttributeError:
  65. pass
  66. return default
  67. # <--- Task <- -- --- - ----- -- #
  68. ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
  69. RESULT_BACKEND = _get("CELERY_RESULT_BACKEND", compat=["CELERY_BACKEND"])
  70. CELERY_BACKEND = RESULT_BACKEND # FIXME Remove in 1.4
  71. CELERY_CACHE_BACKEND = _get("CELERY_CACHE_BACKEND")
  72. TASK_SERIALIZER = _get("CELERY_TASK_SERIALIZER")
  73. TASK_RESULT_EXPIRES = _get("CELERY_TASK_RESULT_EXPIRES")
  74. IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
  75. # Make sure TASK_RESULT_EXPIRES is a timedelta.
  76. if isinstance(TASK_RESULT_EXPIRES, int):
  77. TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)
  78. # <--- Client <- -- --- - ----- -- #
  79. MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
  80. # <--- Worker <- -- --- - ----- -- #
  81. SEND_EVENTS = _get("CELERY_SEND_EVENTS")
  82. DEFAULT_RATE_LIMIT = _get("CELERY_DEFAULT_RATE_LIMIT")
  83. DISABLE_RATE_LIMITS = _get("CELERY_DISABLE_RATE_LIMITS")
  84. STORE_ERRORS_EVEN_IF_IGNORED = _get("CELERY_STORE_ERRORS_EVEN_IF_IGNORED")
  85. CELERY_SEND_TASK_ERROR_EMAILS = _get("CELERY_SEND_TASK_ERROR_EMAILS",
  86. not settings.DEBUG,
  87. compat=["SEND_CELERY_TASK_ERROR_EMAILS"])
  88. CELERYD_LOG_FORMAT = _get("CELERYD_LOG_FORMAT",
  89. compat=["CELERYD_DAEMON_LOG_FORMAT"])
  90. CELERYD_LOG_FILE = _get("CELERYD_LOG_FILE")
  91. CELERYD_LOG_LEVEL = _get("CELERYD_LOG_LEVEL",
  92. compat=["CELERYD_DAEMON_LOG_LEVEL"])
  93. CELERYD_LOG_LEVEL = LOG_LEVELS[CELERYD_LOG_LEVEL.upper()]
  94. CELERYD_CONCURRENCY = _get("CELERYD_CONCURRENCY")
  95. CELERYD_PREFETCH_MULTIPLIER = _get("CELERYD_PREFETCH_MULTIPLIER")
  96. # <--- Message routing <- -- --- - ----- -- #
  97. QUEUES = _get("CELERY_QUEUES")
  98. DEFAULT_QUEUE = _get("CELERY_DEFAULT_QUEUE")
  99. DEFAULT_ROUTING_KEY = _get("CELERY_DEFAULT_ROUTING_KEY")
  100. DEFAULT_EXCHANGE = _get("CELERY_DEFAULT_EXCHANGE")
  101. DEFAULT_EXCHANGE_TYPE = _get("CELERY_DEFAULT_EXCHANGE_TYPE")
  102. DEFAULT_DELIVERY_MODE = _get("CELERY_DEFAULT_DELIVERY_MODE")
  103. _DEPRECATIONS = {"CELERY_AMQP_CONSUMER_QUEUES": "CELERY_QUEUES",
  104. "CELERY_AMQP_CONSUMER_QUEUE": "CELERY_QUEUES",
  105. "CELERY_AMQP_EXCHANGE": "CELERY_DEFAULT_EXCHANGE",
  106. "CELERY_AMQP_EXCHANGE_TYPE": "CELERY_DEFAULT_EXCHANGE_TYPE",
  107. "CELERY_AMQP_CONSUMER_ROUTING_KEY": "CELERY_QUEUES",
  108. "CELERY_AMQP_PUBLISHER_ROUTING_KEY":
  109. "CELERY_DEFAULT_ROUTING_KEY"}
  110. _DEPRECATED_QUEUE_SETTING_FMT = """
  111. %s is deprecated in favor of %s and scheduled for removal in celery v1.0.
  112. Please visit http://bit.ly/5DsSuX for more information.
  113. We're sorry for the inconvenience.
  114. """.strip()
  115. def _find_deprecated_queue_settings():
  116. global DEFAULT_QUEUE, DEFAULT_ROUTING_KEY
  117. global DEFAULT_EXCHANGE, DEFAULT_EXCHANGE_TYPE
  118. binding_key = None
  119. multi = _get("CELERY_AMQP_CONSUMER_QUEUES")
  120. if multi:
  121. return multi
  122. single = _get("CELERY_AMQP_CONSUMER_QUEUE")
  123. if single:
  124. DEFAULT_QUEUE = single
  125. DEFAULT_EXCHANGE = _get("CELERY_AMQP_EXCHANGE", DEFAULT_EXCHANGE)
  126. DEFAULT_EXCHANGE_TYPE = _get("CELERY_AMQP_EXCHANGE_TYPE",
  127. DEFAULT_EXCHANGE_TYPE)
  128. binding_key = _get("CELERY_AMQP_CONSUMER_ROUTING_KEY",
  129. DEFAULT_ROUTING_KEY)
  130. DEFAULT_ROUTING_KEY = _get("CELERY_AMQP_PUBLISHER_ROUTING_KEY",
  131. DEFAULT_ROUTING_KEY)
  132. binding_key = binding_key or DEFAULT_ROUTING_KEY
  133. return {DEFAULT_QUEUE: {"exchange": DEFAULT_EXCHANGE,
  134. "exchange_type": DEFAULT_EXCHANGE_TYPE,
  135. "binding_key": binding_key}}
  136. def _warn_if_deprecated_queue_settings():
  137. for setting, new_setting in _DEPRECATIONS.items():
  138. if _get(setting):
  139. warnings.warn(DeprecationWarning(_DEPRECATED_QUEUE_SETTING_FMT % (
  140. setting, _DEPRECATIONS[setting])))
  141. break
  142. _warn_if_deprecated_queue_settings()
  143. if not QUEUES:
  144. QUEUES = _find_deprecated_queue_settings()
  145. # :--- Broadcast queue settings <- -- --- - ----- -- #
  146. BROADCAST_QUEUE = _get("CELERY_BROADCAST_QUEUE")
  147. BROADCAST_EXCHANGE = _get("CELERY_BROADCAST_EXCHANGE")
  148. BROADCAST_EXCHANGE_TYPE = _get("CELERY_BROADCAST_EXCHANGE_TYPE")
  149. # :--- Event queue settings <- -- --- - ----- -- #
  150. EVENT_QUEUE = _get("CELERY_EVENT_QUEUE")
  151. EVENT_EXCHANGE = _get("CELERY_EVENT_EXCHANGE")
  152. EVENT_EXCHANGE_TYPE = _get("CELERY_EVENT_EXCHANGE_TYPE")
  153. EVENT_ROUTING_KEY = _get("CELERY_EVENT_ROUTING_KEY")
  154. # :--- Broker connections <- -- --- - ----- -- #
  155. BROKER_CONNECTION_TIMEOUT = _get("CELERY_BROKER_CONNECTION_TIMEOUT",
  156. compat=["CELERY_AMQP_CONNECTION_TIMEOUT"])
  157. BROKER_CONNECTION_RETRY = _get("CELERY_BROKER_CONNECTION_RETRY",
  158. compat=["CELERY_AMQP_CONNECTION_RETRY"])
  159. BROKER_CONNECTION_MAX_RETRIES = _get("CELERY_BROKER_CONNECTION_MAX_RETRIES",
  160. compat=["CELERY_AMQP_CONNECTION_MAX_RETRIES"])
  161. # :--- Backend settings <- -- --- - ----- -- #
  162. RESULT_EXCHANGE = _get("CELERY_RESULT_EXCHANGE")
  163. # :--- Celery Beat <- -- --- - ----- -- #
  164. CELERYBEAT_LOG_LEVEL = _get("CELERYBEAT_LOG_LEVEL")
  165. CELERYBEAT_LOG_FILE = _get("CELERYBEAT_LOG_FILE")
  166. CELERYBEAT_SCHEDULE_FILENAME = _get("CELERYBEAT_SCHEDULE_FILENAME")
  167. CELERYBEAT_MAX_LOOP_INTERVAL = _get("CELERYBEAT_MAX_LOOP_INTERVAL")
  168. # :--- Celery Monitor <- -- --- - ----- -- #
  169. CELERYMON_LOG_LEVEL = _get("CELERYMON_LOG_LEVEL")
  170. CELERYMON_LOG_FILE = _get("CELERYMON_LOG_FILE")
  171. def _init_routing_table(queues):
  172. """Convert configuration mapping to a table of queues digestible
  173. by a :class:`carrot.messaging.ConsumerSet`."""
  174. def _defaults(opts):
  175. opts.setdefault("exchange", DEFAULT_EXCHANGE),
  176. opts.setdefault("exchange_type", DEFAULT_EXCHANGE_TYPE)
  177. opts.setdefault("binding_key", DEFAULT_EXCHANGE)
  178. return opts
  179. return dict((queue, _defaults(opts)) for queue, opts in queues.items())
  180. routing_table = _init_routing_table(QUEUES)