defaults.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. # -*- coding: utf-8 -*-
  2. """Configuration introspection and defaults."""
  3. from __future__ import absolute_import, unicode_literals
  4. import sys
  5. from collections import deque, namedtuple
  6. from datetime import timedelta
  7. from celery.five import items, keys, python_2_unicode_compatible
  8. from celery.utils.functional import memoize
  9. from celery.utils.serialization import strtobool
  10. __all__ = ['Option', 'NAMESPACES', 'flatten', 'find']
  11. is_jython = sys.platform.startswith('java')
  12. is_pypy = hasattr(sys, 'pypy_version_info')
  13. DEFAULT_POOL = 'prefork'
  14. if is_jython:
  15. DEFAULT_POOL = 'solo'
  16. elif is_pypy:
  17. if sys.pypy_version_info[0:3] < (1, 5, 0):
  18. DEFAULT_POOL = 'solo'
  19. else:
  20. DEFAULT_POOL = 'prefork'
  21. DEFAULT_ACCEPT_CONTENT = ['json', 'pickle', 'msgpack', 'yaml']
  22. DEFAULT_PROCESS_LOG_FMT = """
  23. [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
  24. """.strip()
  25. DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
  26. %(task_name)s[%(task_id)s]: %(message)s"""
  27. OLD_NS = {'celery_{0}'}
  28. OLD_NS_BEAT = {'celerybeat_{0}'}
  29. OLD_NS_WORKER = {'celeryd_{0}'}
  30. searchresult = namedtuple('searchresult', ('namespace', 'key', 'type'))
  31. def Namespace(__old__=None, **options):
  32. if __old__ is not None:
  33. for key, opt in items(options):
  34. if not opt.old:
  35. opt.old = {o.format(key) for o in __old__}
  36. return options
  37. def old_ns(ns):
  38. return {'{0}_{{0}}'.format(ns)}
  39. @python_2_unicode_compatible
  40. class Option(object):
  41. """Decribes a Celery configuration option."""
  42. alt = None
  43. deprecate_by = None
  44. remove_by = None
  45. old = set()
  46. typemap = dict(string=str, int=int, float=float, any=lambda v: v,
  47. bool=strtobool, dict=dict, tuple=tuple)
  48. def __init__(self, default=None, *args, **kwargs):
  49. self.default = default
  50. self.type = kwargs.get('type') or 'string'
  51. for attr, value in items(kwargs):
  52. setattr(self, attr, value)
  53. def to_python(self, value):
  54. return self.typemap[self.type](value)
  55. def __repr__(self):
  56. return '<Option: type->{0} default->{1!r}>'.format(self.type,
  57. self.default)
  58. NAMESPACES = Namespace(
  59. accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS),
  60. enable_utc=Option(True, type='bool'),
  61. imports=Option((), type='tuple', old=OLD_NS),
  62. include=Option((), type='tuple', old=OLD_NS),
  63. timezone=Option(type='string', old=OLD_NS),
  64. beat=Namespace(
  65. __old__=OLD_NS_BEAT,
  66. max_loop_interval=Option(0, type='float'),
  67. schedule=Option({}, type='dict'),
  68. scheduler=Option('celery.beat:PersistentScheduler'),
  69. schedule_filename=Option('celerybeat-schedule'),
  70. sync_every=Option(0, type='int'),
  71. ),
  72. broker=Namespace(
  73. url=Option(None, type='string'),
  74. read_url=Option(None, type='string'),
  75. write_url=Option(None, type='string'),
  76. transport=Option(type='string'),
  77. transport_options=Option({}, type='dict'),
  78. connection_timeout=Option(4, type='float'),
  79. connection_retry=Option(True, type='bool'),
  80. connection_max_retries=Option(100, type='int'),
  81. failover_strategy=Option(None, type='string'),
  82. heartbeat=Option(120, type='int'),
  83. heartbeat_checkrate=Option(3.0, type='int'),
  84. login_method=Option(None, type='string'),
  85. pool_limit=Option(10, type='int'),
  86. use_ssl=Option(False, type='bool'),
  87. host=Option(type='string'),
  88. port=Option(type='int'),
  89. user=Option(type='string'),
  90. password=Option(type='string'),
  91. vhost=Option(type='string'),
  92. ),
  93. cache=Namespace(
  94. __old__=old_ns('celery_cache'),
  95. backend=Option(),
  96. backend_options=Option({}, type='dict'),
  97. ),
  98. cassandra=Namespace(
  99. entry_ttl=Option(type='float'),
  100. keyspace=Option(type='string'),
  101. port=Option(type='string'),
  102. read_consistency=Option(type='string'),
  103. servers=Option(type='list'),
  104. table=Option(type='string'),
  105. write_consistency=Option(type='string'),
  106. auth_provider=Option(type='string'),
  107. auth_kwargs=Option(type='string'),
  108. ),
  109. control=Namespace(
  110. queue_ttl=Option(300.0, type='float'),
  111. queue_expires=Option(10.0, type='float'),
  112. ),
  113. couchbase=Namespace(
  114. __old__=old_ns('celery_couchbase'),
  115. backend_settings=Option(None, type='dict'),
  116. ),
  117. mongodb=Namespace(
  118. __old__=old_ns('celery_mongodb'),
  119. backend_settings=Option(type='dict'),
  120. ),
  121. event=Namespace(
  122. __old__=old_ns('celery_event'),
  123. queue_expires=Option(60.0, type='float'),
  124. queue_ttl=Option(5.0, type='float'),
  125. queue_prefix=Option('celeryev'),
  126. serializer=Option('json'),
  127. ),
  128. redis=Namespace(
  129. __old__=old_ns('celery_redis'),
  130. db=Option(type='int'),
  131. host=Option(type='string'),
  132. max_connections=Option(type='int'),
  133. password=Option(type='string'),
  134. port=Option(type='int'),
  135. socket_timeout=Option(120.0, type='float'),
  136. socket_connect_timeout=Option(None, type='float'),
  137. ),
  138. result=Namespace(
  139. __old__=old_ns('celery_result'),
  140. backend=Option(type='string'),
  141. cache_max=Option(
  142. -1,
  143. type='int', old={'celery_max_cached_results'},
  144. ),
  145. compression=Option(type='str'),
  146. exchange=Option('celeryresults'),
  147. exchange_type=Option('direct'),
  148. expires=Option(
  149. timedelta(days=1),
  150. type='float', old={'celery_task_result_expires'},
  151. ),
  152. persistent=Option(None, type='bool'),
  153. serializer=Option('json'),
  154. ),
  155. riak=Namespace(
  156. __old__=old_ns('celery_riak'),
  157. backend_settings=Option(type='dict'),
  158. ),
  159. security=Namespace(
  160. __old__=old_ns('celery_security'),
  161. certificate=Option(type='string'),
  162. cert_store=Option(type='string'),
  163. key=Option(type='string'),
  164. ),
  165. database=Namespace(
  166. url=Option(old={'celery_result_dburi'}),
  167. engine_options=Option(
  168. type='dict', old={'celery_result_engine_options'},
  169. ),
  170. short_lived_sessions=Option(
  171. False, type='bool', old={'celery_result_db_short_lived_sessions'},
  172. ),
  173. table_names=Option(type='dict', old={'celery_result_db_tablenames'}),
  174. ),
  175. task=Namespace(
  176. __old__=OLD_NS,
  177. acks_late=Option(False, type='bool'),
  178. always_eager=Option(False, type='bool'),
  179. annotations=Option(type='any'),
  180. compression=Option(type='string', old={'celery_message_compression'}),
  181. create_missing_queues=Option(True, type='bool'),
  182. default_delivery_mode=Option(2, type='string'),
  183. default_queue=Option('celery'),
  184. default_exchange=Option(None, type='string'), # taken from queue
  185. default_exchange_type=Option('direct'),
  186. default_routing_key=Option(None, type='string'), # taken from queue
  187. default_rate_limit=Option(type='string'),
  188. eager_propagates=Option(
  189. False, type='bool', old={'celery_eager_propagates_exceptions'},
  190. ),
  191. ignore_result=Option(False, type='bool'),
  192. protocol=Option(2, type='int', old={'celery_task_protocol'}),
  193. publish_retry=Option(
  194. True, type='bool', old={'celery_task_publish_retry'},
  195. ),
  196. publish_retry_policy=Option(
  197. {'max_retries': 3,
  198. 'interval_start': 0,
  199. 'interval_max': 1,
  200. 'interval_step': 0.2},
  201. type='dict', old={'celery_task_publish_retry_policy'},
  202. ),
  203. queues=Option(type='dict'),
  204. queue_ha_policy=Option(None, type='string'),
  205. queue_max_priority=Option(None, type='int'),
  206. reject_on_worker_lost=Option(type='bool'),
  207. remote_tracebacks=Option(False, type='bool'),
  208. routes=Option(type='any'),
  209. send_sent_event=Option(
  210. False, type='bool', old={'celery_send_task_sent_event'},
  211. ),
  212. serializer=Option('json', old={'celery_task_serializer'}),
  213. soft_time_limit=Option(
  214. type='float', old={'celeryd_task_soft_time_limit'},
  215. ),
  216. time_limit=Option(
  217. type='float', old={'celeryd_task_time_limit'},
  218. ),
  219. store_errors_even_if_ignored=Option(False, type='bool'),
  220. track_started=Option(False, type='bool'),
  221. ),
  222. worker=Namespace(
  223. __old__=OLD_NS_WORKER,
  224. agent=Option(None, type='string'),
  225. autoscaler=Option('celery.worker.autoscale:Autoscaler'),
  226. concurrency=Option(0, type='int'),
  227. consumer=Option('celery.worker.consumer:Consumer', type='string'),
  228. direct=Option(False, type='bool', old={'celery_worker_direct'}),
  229. disable_rate_limits=Option(
  230. False, type='bool', old={'celery_disable_rate_limits'},
  231. ),
  232. enable_remote_control=Option(
  233. True, type='bool', old={'celery_enable_remote_control'},
  234. ),
  235. hijack_root_logger=Option(True, type='bool'),
  236. log_color=Option(type='bool'),
  237. log_format=Option(DEFAULT_PROCESS_LOG_FMT),
  238. lost_wait=Option(10.0, type='float', old={'celeryd_worker_lost_wait'}),
  239. max_memory_per_child=Option(type='int'),
  240. max_tasks_per_child=Option(type='int'),
  241. pool=Option(DEFAULT_POOL),
  242. pool_putlocks=Option(True, type='bool'),
  243. pool_restarts=Option(False, type='bool'),
  244. prefetch_multiplier=Option(4, type='int'),
  245. redirect_stdouts=Option(
  246. True, type='bool', old={'celery_redirect_stdouts'},
  247. ),
  248. redirect_stdouts_level=Option(
  249. 'WARNING', old={'celery_redirect_stdouts_level'},
  250. ),
  251. send_task_events=Option(
  252. False, type='bool', old={'celeryd_send_events'},
  253. ),
  254. state_db=Option(),
  255. task_log_format=Option(DEFAULT_TASK_LOG_FMT),
  256. timer=Option(type='string'),
  257. timer_precision=Option(1.0, type='float'),
  258. ),
  259. )
  260. def _flatten_keys(ns, key, opt):
  261. return [(ns + key, opt)]
  262. def _to_compat(ns, key, opt):
  263. if opt.old:
  264. return [
  265. (oldkey.format(key).upper(), ns + key, opt)
  266. for oldkey in opt.old
  267. ]
  268. return [((ns + key).upper(), ns + key, opt)]
  269. def flatten(d, root='', keyfilter=_flatten_keys):
  270. """Flatten settings."""
  271. stack = deque([(root, d)])
  272. while stack:
  273. ns, options = stack.popleft()
  274. for key, opt in items(options):
  275. if isinstance(opt, dict):
  276. stack.append((ns + key + '_', opt))
  277. else:
  278. for ret in keyfilter(ns, key, opt):
  279. yield ret
  280. DEFAULTS = {
  281. key: opt.default for key, opt in flatten(NAMESPACES)
  282. }
  283. __compat = list(flatten(NAMESPACES, keyfilter=_to_compat))
  284. _OLD_DEFAULTS = {old_key: opt.default for old_key, _, opt in __compat}
  285. _TO_OLD_KEY = {new_key: old_key for old_key, new_key, _ in __compat}
  286. _TO_NEW_KEY = {old_key: new_key for old_key, new_key, _ in __compat}
  287. __compat = None
  288. SETTING_KEYS = set(keys(DEFAULTS))
  289. _OLD_SETTING_KEYS = set(keys(_TO_NEW_KEY))
  290. def find_deprecated_settings(source): # pragma: no cover
  291. from celery.utils import deprecated
  292. for name, opt in flatten(NAMESPACES):
  293. if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None):
  294. deprecated.warn(description='The {0!r} setting'.format(name),
  295. deprecation=opt.deprecate_by,
  296. removal=opt.remove_by,
  297. alternative='Use the {0.alt} instead'.format(opt))
  298. return source
  299. @memoize(maxsize=None)
  300. def find(name, namespace='celery'):
  301. """Find setting by name."""
  302. # - Try specified name-space first.
  303. namespace = namespace.lower()
  304. try:
  305. return searchresult(
  306. namespace, name.lower(), NAMESPACES[namespace][name.lower()],
  307. )
  308. except KeyError:
  309. # - Try all the other namespaces.
  310. for ns, opts in items(NAMESPACES):
  311. if ns.lower() == name.lower():
  312. return searchresult(None, ns, opts)
  313. elif isinstance(opts, dict):
  314. try:
  315. return searchresult(ns, name.lower(), opts[name.lower()])
  316. except KeyError:
  317. pass
  318. # - See if name is a qualname last.
  319. return searchresult(None, name.lower(), DEFAULTS[name.lower()])