defaults.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.defaults
  4. ~~~~~~~~~~~~~~~~~~~
  5. Configuration introspection and defaults.
  6. """
  7. from __future__ import absolute_import
  8. import sys
  9. from collections import deque, namedtuple
  10. from datetime import timedelta
  11. from celery.five import items, keys, values
  12. from celery.utils import strtobool
  13. from celery.utils.functional import memoize
  14. __all__ = ['Option', 'NAMESPACES', 'flatten', 'find']
  15. is_jython = sys.platform.startswith('java')
  16. is_pypy = hasattr(sys, 'pypy_version_info')
  17. DEFAULT_POOL = 'prefork'
  18. if is_jython:
  19. DEFAULT_POOL = 'threads'
  20. elif is_pypy:
  21. if sys.pypy_version_info[0:3] < (1, 5, 0):
  22. DEFAULT_POOL = 'solo'
  23. else:
  24. DEFAULT_POOL = 'prefork'
  25. DEFAULT_ACCEPT_CONTENT = ['json', 'pickle', 'msgpack', 'yaml']
  26. DEFAULT_PROCESS_LOG_FMT = """
  27. [%(asctime)s: %(levelname)s/%(processName)s] %(message)s
  28. """.strip()
  29. DEFAULT_LOG_FMT = '[%(asctime)s: %(levelname)s] %(message)s'
  30. DEFAULT_TASK_LOG_FMT = """[%(asctime)s: %(levelname)s/%(processName)s] \
  31. %(task_name)s[%(task_id)s]: %(message)s"""
  32. OLD_NS = {'celery_{0}'}
  33. OLD_NS_BEAT = {'celerybeat_{0}'}
  34. OLD_NS_WORKER = {'celeryd_{0}'}
  35. searchresult = namedtuple('searchresult', ('namespace', 'key', 'type'))
  36. def Namespace(__old__=None, **options):
  37. if __old__ is not None:
  38. for opt in values(options):
  39. if not opt.old:
  40. opt.old = __old__
  41. return options
  42. def old_ns(ns):
  43. return {'{0}_{{0}}'.format(ns)}
  44. class Option(object):
  45. alt = None
  46. deprecate_by = None
  47. remove_by = None
  48. old = set()
  49. typemap = dict(string=str, int=int, float=float, any=lambda v: v,
  50. bool=strtobool, dict=dict, tuple=tuple)
  51. def __init__(self, default=None, *args, **kwargs):
  52. self.default = default
  53. self.type = kwargs.get('type') or 'string'
  54. for attr, value in items(kwargs):
  55. setattr(self, attr, value)
  56. def to_python(self, value):
  57. return self.typemap[self.type](value)
  58. def __repr__(self):
  59. return '<Option: type->{0} default->{1!r}>'.format(self.type,
  60. self.default)
  61. NAMESPACES = Namespace(
  62. accept_content=Option(DEFAULT_ACCEPT_CONTENT, type='list', old=OLD_NS),
  63. admins=Option((), type='tuple'),
  64. enable_utc=Option(True, type='bool'),
  65. imports=Option((), type='tuple', old=OLD_NS),
  66. include=Option((), type='tuple', old=OLD_NS),
  67. server_email=Option('celery@localhost'),
  68. timezone=Option(type='string', old=OLD_NS),
  69. beat=Namespace(
  70. __old__=OLD_NS_BEAT,
  71. max_loop_interval=Option(0, type='float'),
  72. schedule=Option({}, type='dict'),
  73. scheduler=Option('celery.beat:PersistentScheduler'),
  74. schedule_filename=Option('celerybeat-schedule'),
  75. sync_every=Option(0, type='int'),
  76. ),
  77. broker=Namespace(
  78. url=Option(None, type='string'),
  79. transport=Option(type='string'),
  80. transport_options=Option({}, type='dict'),
  81. connection_timeout=Option(4, type='float'),
  82. connection_retry=Option(True, type='bool'),
  83. connection_max_retries=Option(100, type='int'),
  84. failover_strategy=Option(None, type='string'),
  85. heartbeat=Option(None, type='int'),
  86. heartbeat_checkrate=Option(3.0, type='int'),
  87. login_method=Option(None, type='string'),
  88. pool_limit=Option(10, type='int'),
  89. use_ssl=Option(False, type='bool'),
  90. host=Option(type='string'),
  91. port=Option(type='int'),
  92. user=Option(type='string'),
  93. password=Option(type='string'),
  94. vhost=Option(type='string'),
  95. ),
  96. cache=Namespace(
  97. __old__=old_ns('celery_cache'),
  98. backend=Option(),
  99. backend_options=Option({}, type='dict'),
  100. ),
  101. cassandra=Namespace(
  102. entry_ttl=Option(type="float"),
  103. keyspace=Option(type='string'),
  104. port=Option(type="string"),
  105. read_consistency=Option(type='string'),
  106. servers=Option(type='list'),
  107. table=Option(type='string'),
  108. write_consistency=Option(type='string'),
  109. ),
  110. couchbase=Namespace(
  111. __old__=old_ns('celery_couchbase'),
  112. backend_settings=Option(None, type='dict'),
  113. ),
  114. email=Namespace(
  115. charset=Option('us-ascii'),
  116. host=Option('localhost'),
  117. host_user=Option(),
  118. host_password=Option(),
  119. port=Option(25, type='int'),
  120. timeout=Option(2, type='float'),
  121. use_ssl=Option(False, type='bool'),
  122. use_tls=Option(False, type='bool'),
  123. ),
  124. mongodb=Namespace(
  125. __old__=old_ns('celery_mongodb'),
  126. backend_settings=Option(type='dict'),
  127. ),
  128. event=Namespace(
  129. __old__=old_ns('celery_event'),
  130. queue_expires=Option(60.0, type='float'),
  131. queue_ttl=Option(5.0, type='float'),
  132. serializer=Option('json'),
  133. ),
  134. redis=Namespace(
  135. __old__=old_ns('celery_redis'),
  136. db=Option(type='int'),
  137. host=Option(type='string'),
  138. max_connections=Option(type='int'),
  139. password=Option(type='string'),
  140. port=Option(type='int'),
  141. socket_timeout=Option(5.0, type='float'),
  142. ),
  143. result=Namespace(
  144. __old__=old_ns('celery_result'),
  145. backend=Option(type='string'),
  146. cache_max=Option(
  147. 100,
  148. type='int', old={'celery_max_cached_results'},
  149. ),
  150. compression=Option(type='str'),
  151. exchange=Option('celeryresults'),
  152. exchange_type=Option('direct'),
  153. expires=Option(
  154. timedelta(days=1),
  155. type='float', old={'celery_task_result_expires'},
  156. ),
  157. persistent=Option(None, type='bool'),
  158. serializer=Option('json'),
  159. ),
  160. riak=Namespace(
  161. __old__=old_ns('celery_riak'),
  162. backend_settings=Option(type='dict'),
  163. ),
  164. security=Namespace(
  165. __old__=old_ns('celery_security'),
  166. certificate=Option(type='string'),
  167. cert_store=Option(type='string'),
  168. key=Option(type='string'),
  169. ),
  170. sqlalchemy=Namespace(
  171. dburi=Option(old={'celery_result_dburi'}),
  172. engine_options=Option(
  173. type='dict', old={'celery_result_engine_options'},
  174. ),
  175. short_lived_sessions=Option(
  176. False, type='bool', old={'celery_result_db_short_lived_sessions'},
  177. ),
  178. table_names=Option(type='dict', old={'celery_result_db_tablenames'}),
  179. ),
  180. task=Namespace(
  181. __old__=OLD_NS,
  182. acks_late=Option(False, type='bool'),
  183. always_eager=Option(False, type='bool'),
  184. annotations=Option(type='any'),
  185. compression=Option(type='string', old={'celery_message_compression'}),
  186. create_missing_queues=Option(True, type='bool'),
  187. default_delivery_mode=Option(2, type='string'),
  188. default_exchange=Option('celery'),
  189. default_exchange_type=Option('direct'),
  190. default_queue=Option('celery'),
  191. default_rate_limit=Option(type='string'),
  192. default_routing_key=Option('celery'),
  193. eager_propagates=Option(
  194. False, type='bool', old={'celery_eager_propagates_exceptions'},
  195. ),
  196. ignore_result=Option(False, type='bool'),
  197. protocol=Option(1, type='int', old={'celery_task_protocol'}),
  198. publish_retry=Option(
  199. True, type='bool', old={'celery_task_publish_retry'},
  200. ),
  201. publish_retry_policy=Option(
  202. {'max_retries': 3,
  203. 'interval_start': 0,
  204. 'interval_max': 1,
  205. 'interval_step': 0.2},
  206. type='dict', old={'celery_task_publish_retry_policy'},
  207. ),
  208. queues=Option(type='dict'),
  209. queue_ha_policy=Option(None, type='string'),
  210. queue_max_priority=Option(None, type='int'),
  211. reject_on_worker_lost=Option(type='bool'),
  212. routes=Option(type='any'),
  213. send_error_emails=Option(
  214. False, type='bool', old={'celery_send_task_error_emails'},
  215. ),
  216. send_sent_event=Option(
  217. False, type='bool', old={'celery_send_task_sent_event'},
  218. ),
  219. serializer=Option('json', old={'celery_task_serializer'}),
  220. soft_time_limit=Option(
  221. type='float', old={'celeryd_task_soft_time_limit'},
  222. ),
  223. time_limit=Option(
  224. type='float', old={'celeryd_task_time_limit'},
  225. ),
  226. store_errors_even_if_ignored=Option(False, type='bool'),
  227. track_started=Option(False, type='bool'),
  228. ),
  229. worker=Namespace(
  230. __old__=OLD_NS_WORKER,
  231. agent=Option(None, type='string'),
  232. autoscaler=Option('celery.worker.autoscale:Autoscaler'),
  233. autoreloader=Option('celery.worker.autoreload:Autoreloader'),
  234. concurrency=Option(0, type='int'),
  235. consumer=Option('celery.worker.consumer:Consumer', type='string'),
  236. direct=Option(False, type='bool', old={'celery_worker_direct'}),
  237. disable_rate_limits=Option(
  238. False, type='bool', old={'celery_disable_rate_limits'},
  239. ),
  240. enable_remote_control=Option(
  241. True, type='bool', old={'celery_enable_remote_control'},
  242. ),
  243. force_execv=Option(False, type='bool'),
  244. hijack_root_logger=Option(True, type='bool'),
  245. log_color=Option(type='bool'),
  246. log_format=Option(DEFAULT_PROCESS_LOG_FMT),
  247. lost_wait=Option(10.0, type='float', old={'celeryd_worker_lost_wait'}),
  248. max_memory_per_child=Option(type='int'),
  249. max_tasks_per_child=Option(type='int'),
  250. pool=Option(DEFAULT_POOL),
  251. pool_putlocks=Option(True, type='bool'),
  252. pool_restarts=Option(False, type='bool'),
  253. prefetch_multiplier=Option(4, type='int'),
  254. redirect_stdouts=Option(
  255. True, type='bool', old={'celery_redirect_stdouts'},
  256. ),
  257. redirect_stdouts_level=Option(
  258. 'WARNING', old={'celery_redirect_stdouts_level'},
  259. ),
  260. send_task_events=Option(
  261. False, type='bool', old={'celeryd_send_events'},
  262. ),
  263. state_db=Option(),
  264. task_log_format=Option(DEFAULT_TASK_LOG_FMT),
  265. timer=Option(type='string'),
  266. timer_precision=Option(1.0, type='float'),
  267. ),
  268. )
  269. def _flatten_keys(ns, key, opt):
  270. return [(ns + key, opt)]
  271. def _to_compat(ns, key, opt):
  272. if opt.old:
  273. return [
  274. (oldkey.format(key).upper(), ns + key, opt)
  275. for oldkey in opt.old
  276. ]
  277. return [((ns + key).upper(), ns + key, opt)]
  278. def flatten(d, root='', keyfilter=_flatten_keys):
  279. stack = deque([(root, d)])
  280. while stack:
  281. ns, options = stack.popleft()
  282. for key, opt in items(options):
  283. if isinstance(opt, dict):
  284. stack.append((ns + key + '_', opt))
  285. else:
  286. for ret in keyfilter(ns, key, opt):
  287. yield ret
  288. DEFAULTS = {
  289. key: opt.default for key, opt in flatten(NAMESPACES)
  290. }
  291. __compat = list(flatten(NAMESPACES, keyfilter=_to_compat))
  292. _OLD_DEFAULTS = {old_key: opt.default for old_key, _, opt in __compat}
  293. _TO_OLD_KEY = {new_key: old_key for old_key, new_key, _ in __compat}
  294. _TO_NEW_KEY = {old_key: new_key for old_key, new_key, _ in __compat}
  295. __compat = None
  296. SETTING_KEYS = set(keys(DEFAULTS))
  297. _OLD_SETTING_KEYS = set(keys(_TO_NEW_KEY))
  298. def find_deprecated_settings(source): # pragma: no cover
  299. from celery.utils import warn_deprecated
  300. for name, opt in flatten(NAMESPACES):
  301. if (opt.deprecate_by or opt.remove_by) and getattr(source, name, None):
  302. warn_deprecated(description='The {0!r} setting'.format(name),
  303. deprecation=opt.deprecate_by,
  304. removal=opt.remove_by,
  305. alternative='Use the {0.alt} instead'.format(opt))
  306. return source
  307. @memoize(maxsize=None)
  308. def find(name, namespace='celery'):
  309. # - Try specified namespace first.
  310. namespace = namespace.lower()
  311. try:
  312. return searchresult(
  313. namespace, name.lower(), NAMESPACES[namespace][name.lower()],
  314. )
  315. except KeyError:
  316. # - Try all the other namespaces.
  317. for ns, opts in items(NAMESPACES):
  318. if ns.lower() == name.lower():
  319. return searchresult(None, ns, opts)
  320. elif isinstance(opts, dict):
  321. try:
  322. return searchresult(ns, name.lower(), opts[name.lower()])
  323. except KeyError:
  324. pass
  325. # - See if name is a qualname last.
  326. return searchresult(None, name.lower(), DEFAULTS[name.lower()])