base.py 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267
  1. # -*- coding: utf-8 -*-
  2. """Actual App instance implementation."""
  3. from __future__ import absolute_import, unicode_literals
  4. from datetime import datetime
  5. import os
  6. import threading
  7. import warnings
  8. from collections import defaultdict, deque
  9. from operator import attrgetter
  10. from kombu import pools
  11. from kombu.clocks import LamportClock
  12. from kombu.common import oid_from
  13. from kombu.utils.compat import register_after_fork
  14. from kombu.utils.objects import cached_property
  15. from kombu.utils.uuid import uuid
  16. from vine import starpromise
  17. from vine.utils import wraps
  18. from celery import platforms
  19. from celery import signals
  20. from celery._state import (
  21. _task_stack, get_current_app, _set_current_app, set_default_app,
  22. _register_app, _deregister_app,
  23. get_current_worker_task, connect_on_app_finalize,
  24. _announce_app_finalized,
  25. )
  26. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  27. from celery.five import (
  28. UserDict, bytes_if_py2, python_2_unicode_compatible, values,
  29. )
  30. from celery.loaders import get_loader_cls
  31. from celery.local import PromiseProxy, maybe_evaluate
  32. from celery.utils import abstract
  33. from celery.utils.collections import AttributeDictMixin
  34. from celery.utils.dispatch import Signal
  35. from celery.utils.functional import first, maybe_list, head_from_fun
  36. from celery.utils.time import timezone, \
  37. get_exponential_backoff_interval, to_utc
  38. from celery.utils.imports import gen_task_name, instantiate, symbol_by_name
  39. from celery.utils.log import get_logger
  40. from celery.utils.objects import FallbackContext, mro_lookup
  41. from .annotations import prepare as prepare_annotations
  42. from . import backends
  43. from .defaults import find_deprecated_settings
  44. from .registry import TaskRegistry
  45. from .utils import (
  46. AppPickler, Settings,
  47. bugreport, _unpickle_app, _unpickle_app_v2,
  48. _old_key_to_new, _new_key_to_old,
  49. appstr, detect_settings,
  50. )
  51. # Load all builtin tasks
  52. from . import builtins # noqa
  53. __all__ = ['Celery']
  54. logger = get_logger(__name__)
  55. BUILTIN_FIXUPS = {
  56. 'celery.fixups.django:fixup',
  57. }
  58. USING_EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  59. ERR_ENVVAR_NOT_SET = """
  60. The environment variable {0!r} is not set,
  61. and as such the configuration could not be loaded.
  62. Please set this variable and make sure it points to
  63. a valid configuration module.
  64. Example:
  65. {0}="proj.celeryconfig"
  66. """
  67. def app_has_custom(app, attr):
  68. """Return true if app has customized method `attr`.
  69. Note:
  70. This is used for optimizations in cases where we know
  71. how the default behavior works, but need to account
  72. for someone using inheritance to override a method/property.
  73. """
  74. return mro_lookup(app.__class__, attr, stop={Celery, object},
  75. monkey_patched=[__name__])
  76. def _unpickle_appattr(reverse_name, args):
  77. """Unpickle app."""
  78. # Given an attribute name and a list of args, gets
  79. # the attribute from the current app and calls it.
  80. return get_current_app()._rgetattr(reverse_name)(*args)
  81. def _after_fork_cleanup_app(app):
  82. # This is used with multiprocessing.register_after_fork,
  83. # so need to be at module level.
  84. try:
  85. app._after_fork()
  86. except Exception as exc: # pylint: disable=broad-except
  87. logger.info('after forker raised exception: %r', exc, exc_info=1)
  88. class PendingConfiguration(UserDict, AttributeDictMixin):
  89. # `app.conf` will be of this type before being explicitly configured,
  90. # meaning the app can keep any configuration set directly
  91. # on `app.conf` before the `app.config_from_object` call.
  92. #
  93. # accessing any key will finalize the configuration,
  94. # replacing `app.conf` with a concrete settings object.
  95. callback = None
  96. _data = None
  97. def __init__(self, conf, callback):
  98. object.__setattr__(self, '_data', conf)
  99. object.__setattr__(self, 'callback', callback)
  100. def __setitem__(self, key, value):
  101. self._data[key] = value
  102. def clear(self):
  103. self._data.clear()
  104. def update(self, *args, **kwargs):
  105. self._data.update(*args, **kwargs)
  106. def setdefault(self, *args, **kwargs):
  107. return self._data.setdefault(*args, **kwargs)
  108. def __contains__(self, key):
  109. # XXX will not show finalized configuration
  110. # setdefault will cause `key in d` to happen,
  111. # so for setdefault to be lazy, so does contains.
  112. return key in self._data
  113. def __len__(self):
  114. return len(self.data)
  115. def __repr__(self):
  116. return repr(self.data)
  117. @cached_property
  118. def data(self):
  119. return self.callback()
  120. @python_2_unicode_compatible
  121. class Celery(object):
  122. """Celery application.
  123. Arguments:
  124. main (str): Name of the main module if running as `__main__`.
  125. This is used as the prefix for auto-generated task names.
  126. Keyword Arguments:
  127. broker (str): URL of the default broker used.
  128. backend (Union[str, type]): The result store backend class,
  129. or the name of the backend class to use.
  130. Default is the value of the :setting:`result_backend` setting.
  131. autofinalize (bool): If set to False a :exc:`RuntimeError`
  132. will be raised if the task registry or tasks are used before
  133. the app is finalized.
  134. set_as_current (bool): Make this the global current app.
  135. include (List[str]): List of modules every worker should import.
  136. amqp (Union[str, type]): AMQP object or class name.
  137. events (Union[str, type]): Events object or class name.
  138. log (Union[str, type]): Log object or class name.
  139. control (Union[str, type]): Control object or class name.
  140. tasks (Union[str, type]): A task registry, or the name of
  141. a registry class.
  142. fixups (List[str]): List of fix-up plug-ins (e.g., see
  143. :mod:`celery.fixups.django`).
  144. config_source (Union[str, type]): Take configuration from a class,
  145. or object. Attributes may include any setings described in
  146. the documentation.
  147. """
  148. #: This is deprecated, use :meth:`reduce_keys` instead
  149. Pickler = AppPickler
  150. SYSTEM = platforms.SYSTEM
  151. IS_macOS, IS_WINDOWS = platforms.IS_macOS, platforms.IS_WINDOWS
  152. #: Name of the `__main__` module. Required for standalone scripts.
  153. #:
  154. #: If set this will be used instead of `__main__` when automatically
  155. #: generating task names.
  156. main = None
  157. #: Custom options for command-line programs.
  158. #: See :ref:`extending-commandoptions`
  159. user_options = None
  160. #: Custom bootsteps to extend and modify the worker.
  161. #: See :ref:`extending-bootsteps`.
  162. steps = None
  163. builtin_fixups = BUILTIN_FIXUPS
  164. amqp_cls = 'celery.app.amqp:AMQP'
  165. backend_cls = None
  166. events_cls = 'celery.app.events:Events'
  167. loader_cls = None
  168. log_cls = 'celery.app.log:Logging'
  169. control_cls = 'celery.app.control:Control'
  170. task_cls = 'celery.app.task:Task'
  171. registry_cls = TaskRegistry
  172. _fixups = None
  173. _pool = None
  174. _conf = None
  175. _after_fork_registered = False
  176. #: Signal sent when app is loading configuration.
  177. on_configure = None
  178. #: Signal sent after app has prepared the configuration.
  179. on_after_configure = None
  180. #: Signal sent after app has been finalized.
  181. on_after_finalize = None
  182. #: Signal sent by every new process after fork.
  183. on_after_fork = None
  184. def __init__(self, main=None, loader=None, backend=None,
  185. amqp=None, events=None, log=None, control=None,
  186. set_as_current=True, tasks=None, broker=None, include=None,
  187. changes=None, config_source=None, fixups=None, task_cls=None,
  188. autofinalize=True, namespace=None, strict_typing=True,
  189. **kwargs):
  190. self.clock = LamportClock()
  191. self.main = main
  192. self.amqp_cls = amqp or self.amqp_cls
  193. self.events_cls = events or self.events_cls
  194. self.loader_cls = loader or self._get_default_loader()
  195. self.log_cls = log or self.log_cls
  196. self.control_cls = control or self.control_cls
  197. self.task_cls = task_cls or self.task_cls
  198. self.set_as_current = set_as_current
  199. self.registry_cls = symbol_by_name(self.registry_cls)
  200. self.user_options = defaultdict(set)
  201. self.steps = defaultdict(set)
  202. self.autofinalize = autofinalize
  203. self.namespace = namespace
  204. self.strict_typing = strict_typing
  205. self.configured = False
  206. self._config_source = config_source
  207. self._pending_defaults = deque()
  208. self._pending_periodic_tasks = deque()
  209. self.finalized = False
  210. self._finalize_mutex = threading.Lock()
  211. self._pending = deque()
  212. self._tasks = tasks
  213. if not isinstance(self._tasks, TaskRegistry):
  214. self._tasks = self.registry_cls(self._tasks or {})
  215. # If the class defines a custom __reduce_args__ we need to use
  216. # the old way of pickling apps: pickling a list of
  217. # args instead of the new way that pickles a dict of keywords.
  218. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  219. # these options are moved to the config to
  220. # simplify pickling of the app object.
  221. self._preconf = changes or {}
  222. self._preconf_set_by_auto = set()
  223. self.__autoset('broker_url', broker)
  224. self.__autoset('result_backend', backend)
  225. self.__autoset('include', include)
  226. self._conf = Settings(
  227. PendingConfiguration(
  228. self._preconf, self._finalize_pending_conf),
  229. prefix=self.namespace,
  230. keys=(_old_key_to_new, _new_key_to_old),
  231. )
  232. # - Apply fix-ups.
  233. self.fixups = set(self.builtin_fixups) if fixups is None else fixups
  234. # ...store fixup instances in _fixups to keep weakrefs alive.
  235. self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
  236. if self.set_as_current:
  237. self.set_current()
  238. # Signals
  239. if self.on_configure is None:
  240. # used to be a method pre 4.0
  241. self.on_configure = Signal(name='app.on_configure')
  242. self.on_after_configure = Signal(
  243. name='app.on_after_configure',
  244. providing_args={'source'},
  245. )
  246. self.on_after_finalize = Signal(name='app.on_after_finalize')
  247. self.on_after_fork = Signal(name='app.on_after_fork')
  248. self.on_init()
  249. _register_app(self)
  250. def _get_default_loader(self):
  251. # the --loader command-line argument sets the environment variable.
  252. return (
  253. os.environ.get('CELERY_LOADER') or
  254. self.loader_cls or
  255. 'celery.loaders.app:AppLoader'
  256. )
  257. def on_init(self):
  258. """Optional callback called at init."""
  259. pass
  260. def __autoset(self, key, value):
  261. if value:
  262. self._preconf[key] = value
  263. self._preconf_set_by_auto.add(key)
  264. def set_current(self):
  265. """Make this the current app for this thread."""
  266. _set_current_app(self)
  267. def set_default(self):
  268. """Make this the default app for all threads."""
  269. set_default_app(self)
  270. def _ensure_after_fork(self):
  271. if not self._after_fork_registered:
  272. self._after_fork_registered = True
  273. if register_after_fork is not None:
  274. register_after_fork(self, _after_fork_cleanup_app)
  275. def close(self):
  276. """Clean up after the application.
  277. Only necessary for dynamically created apps, and you should
  278. probably use the :keyword:`with` statement instead.
  279. Example:
  280. >>> with Celery(set_as_current=False) as app:
  281. ... with app.connection_for_write() as conn:
  282. ... pass
  283. """
  284. self._pool = None
  285. _deregister_app(self)
  286. def start(self, argv=None):
  287. """Run :program:`celery` using `argv`.
  288. Uses :data:`sys.argv` if `argv` is not specified.
  289. """
  290. return instantiate(
  291. 'celery.bin.celery:CeleryCommand', app=self
  292. ).execute_from_commandline(argv)
  293. def worker_main(self, argv=None):
  294. """Run :program:`celery worker` using `argv`.
  295. Uses :data:`sys.argv` if `argv` is not specified.
  296. """
  297. return instantiate(
  298. 'celery.bin.worker:worker', app=self
  299. ).execute_from_commandline(argv)
  300. def task(self, *args, **opts):
  301. """Decorator to create a task class out of any callable.
  302. Examples:
  303. .. code-block:: python
  304. @app.task
  305. def refresh_feed(url):
  306. store_feed(feedparser.parse(url))
  307. with setting extra options:
  308. .. code-block:: python
  309. @app.task(exchange='feeds')
  310. def refresh_feed(url):
  311. return store_feed(feedparser.parse(url))
  312. Note:
  313. App Binding: For custom apps the task decorator will return
  314. a proxy object, so that the act of creating the task is not
  315. performed until the task is used or the task registry is accessed.
  316. If you're depending on binding to be deferred, then you must
  317. not access any attributes on the returned object until the
  318. application is fully set up (finalized).
  319. """
  320. if USING_EXECV and opts.get('lazy', True):
  321. # When using execv the task in the original module will point to a
  322. # different app, so doing things like 'add.request' will point to
  323. # a different task instance. This makes sure it will always use
  324. # the task instance from the current app.
  325. # Really need a better solution for this :(
  326. from . import shared_task
  327. return shared_task(*args, lazy=False, **opts)
  328. def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
  329. _filt = filter # stupid 2to3
  330. def _create_task_cls(fun):
  331. if shared:
  332. def cons(app):
  333. return app._task_from_fun(fun, **opts)
  334. cons.__name__ = fun.__name__
  335. connect_on_app_finalize(cons)
  336. if not lazy or self.finalized:
  337. ret = self._task_from_fun(fun, **opts)
  338. else:
  339. # return a proxy object that evaluates on first use
  340. ret = PromiseProxy(self._task_from_fun, (fun,), opts,
  341. __doc__=fun.__doc__)
  342. self._pending.append(ret)
  343. if _filt:
  344. return _filt(ret)
  345. return ret
  346. return _create_task_cls
  347. if len(args) == 1:
  348. if callable(args[0]):
  349. return inner_create_task_cls(**opts)(*args)
  350. raise TypeError('argument 1 to @task() must be a callable')
  351. if args:
  352. raise TypeError(
  353. '@task() takes exactly 1 argument ({0} given)'.format(
  354. sum([len(args), len(opts)])))
  355. return inner_create_task_cls(**opts)
  356. def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
  357. if not self.finalized and not self.autofinalize:
  358. raise RuntimeError('Contract breach: app not finalized')
  359. name = name or self.gen_task_name(fun.__name__, fun.__module__)
  360. base = base or self.Task
  361. if name not in self._tasks:
  362. run = fun if bind else staticmethod(fun)
  363. task = type(fun.__name__, (base,), dict({
  364. 'app': self,
  365. 'name': name,
  366. 'run': run,
  367. '_decorated': True,
  368. '__doc__': fun.__doc__,
  369. '__module__': fun.__module__,
  370. '__header__': staticmethod(head_from_fun(fun, bound=bind)),
  371. '__wrapped__': run}, **options))()
  372. # for some reason __qualname__ cannot be set in type()
  373. # so we have to set it here.
  374. try:
  375. task.__qualname__ = fun.__qualname__
  376. except AttributeError:
  377. pass
  378. self._tasks[task.name] = task
  379. task.bind(self) # connects task to this app
  380. autoretry_for = tuple(options.get('autoretry_for', ()))
  381. retry_kwargs = options.get('retry_kwargs', {})
  382. retry_backoff = int(options.get('retry_backoff', False))
  383. retry_backoff_max = int(options.get('retry_backoff_max', 600))
  384. retry_jitter = options.get('retry_jitter', True)
  385. if autoretry_for and not hasattr(task, '_orig_run'):
  386. @wraps(task.run)
  387. def run(*args, **kwargs):
  388. try:
  389. return task._orig_run(*args, **kwargs)
  390. except autoretry_for as exc:
  391. if retry_backoff:
  392. retry_kwargs['countdown'] = \
  393. get_exponential_backoff_interval(
  394. factor=retry_backoff,
  395. retries=task.request.retries,
  396. maximum=retry_backoff_max,
  397. full_jitter=retry_jitter)
  398. raise task.retry(exc=exc, **retry_kwargs)
  399. task._orig_run, task.run = task.run, run
  400. else:
  401. task = self._tasks[name]
  402. return task
  403. def register_task(self, task):
  404. """Utility for registering a task-based class.
  405. Note:
  406. This is here for compatibility with old Celery 1.0
  407. style task classes, you should not need to use this for
  408. new projects.
  409. """
  410. if not task.name:
  411. task_cls = type(task)
  412. task.name = self.gen_task_name(
  413. task_cls.__name__, task_cls.__module__)
  414. self.tasks[task.name] = task
  415. task._app = self
  416. task.bind(self)
  417. return task
  418. def gen_task_name(self, name, module):
  419. return gen_task_name(self, name, module)
  420. def finalize(self, auto=False):
  421. """Finalize the app.
  422. This loads built-in tasks, evaluates pending task decorators,
  423. reads configuration, etc.
  424. """
  425. with self._finalize_mutex:
  426. if not self.finalized:
  427. if auto and not self.autofinalize:
  428. raise RuntimeError('Contract breach: app not finalized')
  429. self.finalized = True
  430. _announce_app_finalized(self)
  431. pending = self._pending
  432. while pending:
  433. maybe_evaluate(pending.popleft())
  434. for task in values(self._tasks):
  435. task.bind(self)
  436. self.on_after_finalize.send(sender=self)
  437. def add_defaults(self, fun):
  438. """Add default configuration from dict ``d``.
  439. If the argument is a callable function then it will be regarded
  440. as a promise, and it won't be loaded until the configuration is
  441. actually needed.
  442. This method can be compared to:
  443. .. code-block:: pycon
  444. >>> celery.conf.update(d)
  445. with a difference that 1) no copy will be made and 2) the dict will
  446. not be transferred when the worker spawns child processes, so
  447. it's important that the same configuration happens at import time
  448. when pickle restores the object on the other side.
  449. """
  450. if not callable(fun):
  451. d, fun = fun, lambda: d
  452. if self.configured:
  453. return self._conf.add_defaults(fun())
  454. self._pending_defaults.append(fun)
  455. def config_from_object(self, obj,
  456. silent=False, force=False, namespace=None):
  457. """Read configuration from object.
  458. Object is either an actual object or the name of a module to import.
  459. Example:
  460. >>> celery.config_from_object('myapp.celeryconfig')
  461. >>> from myapp import celeryconfig
  462. >>> celery.config_from_object(celeryconfig)
  463. Arguments:
  464. silent (bool): If true then import errors will be ignored.
  465. force (bool): Force reading configuration immediately.
  466. By default the configuration will be read only when required.
  467. """
  468. self._config_source = obj
  469. self.namespace = namespace or self.namespace
  470. if force or self.configured:
  471. self._conf = None
  472. if self.loader.config_from_object(obj, silent=silent):
  473. return self.conf
  474. def config_from_envvar(self, variable_name, silent=False, force=False):
  475. """Read configuration from environment variable.
  476. The value of the environment variable must be the name
  477. of a module to import.
  478. Example:
  479. >>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig'
  480. >>> celery.config_from_envvar('CELERY_CONFIG_MODULE')
  481. """
  482. module_name = os.environ.get(variable_name)
  483. if not module_name:
  484. if silent:
  485. return False
  486. raise ImproperlyConfigured(
  487. ERR_ENVVAR_NOT_SET.strip().format(variable_name))
  488. return self.config_from_object(module_name, silent=silent, force=force)
  489. def config_from_cmdline(self, argv, namespace='celery'):
  490. self._conf.update(
  491. self.loader.cmdline_config_parser(argv, namespace)
  492. )
  493. def setup_security(self, allowed_serializers=None, key=None, cert=None,
  494. store=None, digest='sha1', serializer='json'):
  495. """Setup the message-signing serializer.
  496. This will affect all application instances (a global operation).
  497. Disables untrusted serializers and if configured to use the ``auth``
  498. serializer will register the ``auth`` serializer with the provided
  499. settings into the Kombu serializer registry.
  500. Arguments:
  501. allowed_serializers (Set[str]): List of serializer names, or
  502. content_types that should be exempt from being disabled.
  503. key (str): Name of private key file to use.
  504. Defaults to the :setting:`security_key` setting.
  505. cert (str): Name of certificate file to use.
  506. Defaults to the :setting:`security_certificate` setting.
  507. store (str): Directory containing certificates.
  508. Defaults to the :setting:`security_cert_store` setting.
  509. digest (str): Digest algorithm used when signing messages.
  510. Default is ``sha1``.
  511. serializer (str): Serializer used to encode messages after
  512. they've been signed. See :setting:`task_serializer` for
  513. the serializers supported. Default is ``json``.
  514. """
  515. from celery.security import setup_security
  516. return setup_security(allowed_serializers, key, cert,
  517. store, digest, serializer, app=self)
  518. def autodiscover_tasks(self, packages=None,
  519. related_name='tasks', force=False):
  520. """Auto-discover task modules.
  521. Searches a list of packages for a "tasks.py" module (or use
  522. related_name argument).
  523. If the name is empty, this will be delegated to fix-ups (e.g., Django).
  524. For example if you have a directory layout like this:
  525. .. code-block:: text
  526. foo/__init__.py
  527. tasks.py
  528. models.py
  529. bar/__init__.py
  530. tasks.py
  531. models.py
  532. baz/__init__.py
  533. models.py
  534. Then calling ``app.autodiscover_tasks(['foo', bar', 'baz'])`` will
  535. result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.
  536. Arguments:
  537. packages (List[str]): List of packages to search.
  538. This argument may also be a callable, in which case the
  539. value returned is used (for lazy evaluation).
  540. related_name (str): The name of the module to find. Defaults
  541. to "tasks": meaning "look for 'module.tasks' for every
  542. module in ``packages``."
  543. force (bool): By default this call is lazy so that the actual
  544. auto-discovery won't happen until an application imports
  545. the default modules. Forcing will cause the auto-discovery
  546. to happen immediately.
  547. """
  548. if force:
  549. return self._autodiscover_tasks(packages, related_name)
  550. signals.import_modules.connect(starpromise(
  551. self._autodiscover_tasks, packages, related_name,
  552. ), weak=False, sender=self)
  553. def _autodiscover_tasks(self, packages, related_name, **kwargs):
  554. if packages:
  555. return self._autodiscover_tasks_from_names(packages, related_name)
  556. return self._autodiscover_tasks_from_fixups(related_name)
  557. def _autodiscover_tasks_from_names(self, packages, related_name):
  558. # packages argument can be lazy
  559. return self.loader.autodiscover_tasks(
  560. packages() if callable(packages) else packages, related_name,
  561. )
  562. def _autodiscover_tasks_from_fixups(self, related_name):
  563. return self._autodiscover_tasks_from_names([
  564. pkg for fixup in self._fixups
  565. for pkg in fixup.autodiscover_tasks()
  566. if hasattr(fixup, 'autodiscover_tasks')
  567. ], related_name=related_name)
  568. def send_task(self, name, args=None, kwargs=None, countdown=None,
  569. eta=None, task_id=None, producer=None, connection=None,
  570. router=None, result_cls=None, expires=None,
  571. publisher=None, link=None, link_error=None,
  572. add_to_parent=True, group_id=None, retries=0, chord=None,
  573. reply_to=None, time_limit=None, soft_time_limit=None,
  574. root_id=None, parent_id=None, route_name=None,
  575. shadow=None, chain=None, task_type=None, **options):
  576. """Send task by name.
  577. Supports the same arguments as :meth:`@-Task.apply_async`.
  578. Arguments:
  579. name (str): Name of task to call (e.g., `"tasks.add"`).
  580. result_cls (~@AsyncResult): Specify custom result class.
  581. """
  582. parent = have_parent = None
  583. amqp = self.amqp
  584. task_id = task_id or uuid()
  585. producer = producer or publisher # XXX compat
  586. router = router or amqp.router
  587. conf = self.conf
  588. if conf.task_always_eager: # pragma: no cover
  589. warnings.warn(AlwaysEagerIgnored(
  590. 'task_always_eager has no effect on send_task',
  591. ), stacklevel=2)
  592. options = router.route(
  593. options, route_name or name, args, kwargs, task_type)
  594. if not root_id or not parent_id:
  595. parent = self.current_worker_task
  596. if parent:
  597. if not root_id:
  598. root_id = parent.request.root_id or parent.request.id
  599. if not parent_id:
  600. parent_id = parent.request.id
  601. message = amqp.create_task_message(
  602. task_id, name, args, kwargs, countdown, eta, group_id,
  603. expires, retries, chord,
  604. maybe_list(link), maybe_list(link_error),
  605. reply_to or self.oid, time_limit, soft_time_limit,
  606. self.conf.task_send_sent_event,
  607. root_id, parent_id, shadow, chain,
  608. )
  609. if connection:
  610. producer = amqp.Producer(connection, auto_declare=False)
  611. with self.producer_or_acquire(producer) as P:
  612. with P.connection._reraise_as_library_errors():
  613. self.backend.on_task_call(P, task_id)
  614. amqp.send_task_message(P, name, message, **options)
  615. result = (result_cls or self.AsyncResult)(task_id)
  616. if add_to_parent:
  617. if not have_parent:
  618. parent, have_parent = self.current_worker_task, True
  619. if parent:
  620. parent.add_trail(result)
  621. return result
  622. def connection_for_read(self, url=None, **kwargs):
  623. """Establish connection used for consuming.
  624. See Also:
  625. :meth:`connection` for supported arguments.
  626. """
  627. return self._connection(url or self.conf.broker_read_url, **kwargs)
  628. def connection_for_write(self, url=None, **kwargs):
  629. """Establish connection used for producing.
  630. See Also:
  631. :meth:`connection` for supported arguments.
  632. """
  633. return self._connection(url or self.conf.broker_write_url, **kwargs)
  634. def connection(self, hostname=None, userid=None, password=None,
  635. virtual_host=None, port=None, ssl=None,
  636. connect_timeout=None, transport=None,
  637. transport_options=None, heartbeat=None,
  638. login_method=None, failover_strategy=None, **kwargs):
  639. """Establish a connection to the message broker.
  640. Please use :meth:`connection_for_read` and
  641. :meth:`connection_for_write` instead, to convey the intent
  642. of use for this connection.
  643. Arguments:
  644. url: Either the URL or the hostname of the broker to use.
  645. hostname (str): URL, Hostname/IP-address of the broker.
  646. If a URL is used, then the other argument below will
  647. be taken from the URL instead.
  648. userid (str): Username to authenticate as.
  649. password (str): Password to authenticate with
  650. virtual_host (str): Virtual host to use (domain).
  651. port (int): Port to connect to.
  652. ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl`
  653. setting.
  654. transport (str): defaults to the :setting:`broker_transport`
  655. setting.
  656. transport_options (Dict): Dictionary of transport specific options.
  657. heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only).
  658. login_method (str): Custom login method to use (AMQP only).
  659. failover_strategy (str, Callable): Custom failover strategy.
  660. **kwargs: Additional arguments to :class:`kombu.Connection`.
  661. Returns:
  662. kombu.Connection: the lazy connection instance.
  663. """
  664. return self.connection_for_write(
  665. hostname or self.conf.broker_write_url,
  666. userid=userid, password=password,
  667. virtual_host=virtual_host, port=port, ssl=ssl,
  668. connect_timeout=connect_timeout, transport=transport,
  669. transport_options=transport_options, heartbeat=heartbeat,
  670. login_method=login_method, failover_strategy=failover_strategy,
  671. **kwargs
  672. )
  673. def _connection(self, url, userid=None, password=None,
  674. virtual_host=None, port=None, ssl=None,
  675. connect_timeout=None, transport=None,
  676. transport_options=None, heartbeat=None,
  677. login_method=None, failover_strategy=None, **kwargs):
  678. conf = self.conf
  679. return self.amqp.Connection(
  680. url,
  681. userid or conf.broker_user,
  682. password or conf.broker_password,
  683. virtual_host or conf.broker_vhost,
  684. port or conf.broker_port,
  685. transport=transport or conf.broker_transport,
  686. ssl=self.either('broker_use_ssl', ssl),
  687. heartbeat=heartbeat or self.conf.broker_heartbeat,
  688. login_method=login_method or conf.broker_login_method,
  689. failover_strategy=(
  690. failover_strategy or conf.broker_failover_strategy
  691. ),
  692. transport_options=dict(
  693. conf.broker_transport_options, **transport_options or {}
  694. ),
  695. connect_timeout=self.either(
  696. 'broker_connection_timeout', connect_timeout
  697. ),
  698. )
  699. broker_connection = connection
  700. def _acquire_connection(self, pool=True):
  701. """Helper for :meth:`connection_or_acquire`."""
  702. if pool:
  703. return self.pool.acquire(block=True)
  704. return self.connection_for_write()
  705. def connection_or_acquire(self, connection=None, pool=True, *_, **__):
  706. """Context used to acquire a connection from the pool.
  707. For use within a :keyword:`with` statement to get a connection
  708. from the pool if one is not already provided.
  709. Arguments:
  710. connection (kombu.Connection): If not provided, a connection
  711. will be acquired from the connection pool.
  712. """
  713. return FallbackContext(connection, self._acquire_connection, pool=pool)
  714. default_connection = connection_or_acquire # XXX compat
  715. def producer_or_acquire(self, producer=None):
  716. """Context used to acquire a producer from the pool.
  717. For use within a :keyword:`with` statement to get a producer
  718. from the pool if one is not already provided
  719. Arguments:
  720. producer (kombu.Producer): If not provided, a producer
  721. will be acquired from the producer pool.
  722. """
  723. return FallbackContext(
  724. producer, self.producer_pool.acquire, block=True,
  725. )
  726. default_producer = producer_or_acquire # XXX compat
  727. def prepare_config(self, c):
  728. """Prepare configuration before it is merged with the defaults."""
  729. return find_deprecated_settings(c)
  730. def now(self):
  731. """Return the current time and date as a datetime."""
  732. now_in_utc = to_utc(datetime.utcnow())
  733. return now_in_utc.astimezone(self.timezone)
  734. def select_queues(self, queues=None):
  735. """Select subset of queues.
  736. Arguments:
  737. queues (Sequence[str]): a list of queue names to keep.
  738. """
  739. return self.amqp.queues.select(queues)
  740. def either(self, default_key, *defaults):
  741. """Get key from configuration or use default values.
  742. Fallback to the value of a configuration key if none of the
  743. `*values` are true.
  744. """
  745. return first(None, [
  746. first(None, defaults), starpromise(self.conf.get, default_key),
  747. ])
  748. def bugreport(self):
  749. """Return information useful in bug reports."""
  750. return bugreport(self)
  751. def _get_backend(self):
  752. backend, url = backends.by_url(
  753. self.backend_cls or self.conf.result_backend,
  754. self.loader)
  755. return backend(app=self, url=url)
  756. def _finalize_pending_conf(self):
  757. """Get config value by key and finalize loading the configuration.
  758. Note:
  759. This is used by PendingConfiguration:
  760. as soon as you access a key the configuration is read.
  761. """
  762. conf = self._conf = self._load_config()
  763. return conf
  764. def _load_config(self):
  765. if isinstance(self.on_configure, Signal):
  766. self.on_configure.send(sender=self)
  767. else:
  768. # used to be a method pre 4.0
  769. self.on_configure()
  770. if self._config_source:
  771. self.loader.config_from_object(self._config_source)
  772. self.configured = True
  773. settings = detect_settings(
  774. self.prepare_config(self.loader.conf), self._preconf,
  775. ignore_keys=self._preconf_set_by_auto, prefix=self.namespace,
  776. )
  777. if self._conf is not None:
  778. # replace in place, as someone may have referenced app.conf,
  779. # done some changes, accessed a key, and then try to make more
  780. # changes to the reference and not the finalized value.
  781. self._conf.swap_with(settings)
  782. else:
  783. self._conf = settings
  784. # load lazy config dict initializers.
  785. pending_def = self._pending_defaults
  786. while pending_def:
  787. self._conf.add_defaults(maybe_evaluate(pending_def.popleft()()))
  788. # load lazy periodic tasks
  789. pending_beat = self._pending_periodic_tasks
  790. while pending_beat:
  791. self._add_periodic_task(*pending_beat.popleft())
  792. self.on_after_configure.send(sender=self, source=self._conf)
  793. return self._conf
  794. def _after_fork(self):
  795. self._pool = None
  796. try:
  797. self.__dict__['amqp']._producer_pool = None
  798. except (AttributeError, KeyError):
  799. pass
  800. self.on_after_fork.send(sender=self)
  801. def signature(self, *args, **kwargs):
  802. """Return a new :class:`~celery.Signature` bound to this app."""
  803. kwargs['app'] = self
  804. return self._canvas.signature(*args, **kwargs)
  805. def add_periodic_task(self, schedule, sig,
  806. args=(), kwargs=(), name=None, **opts):
  807. key, entry = self._sig_to_periodic_task_entry(
  808. schedule, sig, args, kwargs, name, **opts)
  809. if self.configured:
  810. self._add_periodic_task(key, entry)
  811. else:
  812. self._pending_periodic_tasks.append((key, entry))
  813. return key
  814. def _sig_to_periodic_task_entry(self, schedule, sig,
  815. args=(), kwargs={}, name=None, **opts):
  816. sig = (sig.clone(args, kwargs)
  817. if isinstance(sig, abstract.CallableSignature)
  818. else self.signature(sig.name, args, kwargs))
  819. return name or repr(sig), {
  820. 'schedule': schedule,
  821. 'task': sig.name,
  822. 'args': sig.args,
  823. 'kwargs': sig.kwargs,
  824. 'options': dict(sig.options, **opts),
  825. }
  826. def _add_periodic_task(self, key, entry):
  827. self._conf.beat_schedule[key] = entry
  828. def create_task_cls(self):
  829. """Create a base task class bound to this app."""
  830. return self.subclass_with_self(
  831. self.task_cls, name='Task', attribute='_app',
  832. keep_reduce=True, abstract=True,
  833. )
  834. def subclass_with_self(self, Class, name=None, attribute='app',
  835. reverse=None, keep_reduce=False, **kw):
  836. """Subclass an app-compatible class.
  837. App-compatible means that the class has a class attribute that
  838. provides the default app it should use, for example:
  839. ``class Foo: app = None``.
  840. Arguments:
  841. Class (type): The app-compatible class to subclass.
  842. name (str): Custom name for the target class.
  843. attribute (str): Name of the attribute holding the app,
  844. Default is 'app'.
  845. reverse (str): Reverse path to this object used for pickling
  846. purposes. For example, to get ``app.AsyncResult``,
  847. use ``"AsyncResult"``.
  848. keep_reduce (bool): If enabled a custom ``__reduce__``
  849. implementation won't be provided.
  850. """
  851. Class = symbol_by_name(Class)
  852. reverse = reverse if reverse else Class.__name__
  853. def __reduce__(self):
  854. return _unpickle_appattr, (reverse, self.__reduce_args__())
  855. attrs = dict(
  856. {attribute: self},
  857. __module__=Class.__module__,
  858. __doc__=Class.__doc__,
  859. **kw)
  860. if not keep_reduce:
  861. attrs['__reduce__'] = __reduce__
  862. return type(bytes_if_py2(name or Class.__name__), (Class,), attrs)
  863. def _rgetattr(self, path):
  864. return attrgetter(path)(self)
  865. def __enter__(self):
  866. return self
  867. def __exit__(self, *exc_info):
  868. self.close()
  869. def __repr__(self):
  870. return '<{0} {1}>'.format(type(self).__name__, appstr(self))
  871. def __reduce__(self):
  872. if self._using_v1_reduce:
  873. return self.__reduce_v1__()
  874. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  875. def __reduce_v1__(self):
  876. # Reduce only pickles the configuration changes,
  877. # so the default configuration doesn't have to be passed
  878. # between processes.
  879. return (
  880. _unpickle_app,
  881. (self.__class__, self.Pickler) + self.__reduce_args__(),
  882. )
  883. def __reduce_keys__(self):
  884. """Keyword arguments used to reconstruct the object when unpickling."""
  885. return {
  886. 'main': self.main,
  887. 'changes':
  888. self._conf.changes if self.configured else self._preconf,
  889. 'loader': self.loader_cls,
  890. 'backend': self.backend_cls,
  891. 'amqp': self.amqp_cls,
  892. 'events': self.events_cls,
  893. 'log': self.log_cls,
  894. 'control': self.control_cls,
  895. 'fixups': self.fixups,
  896. 'config_source': self._config_source,
  897. 'task_cls': self.task_cls,
  898. 'namespace': self.namespace,
  899. }
  900. def __reduce_args__(self):
  901. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  902. return (self.main, self._conf.changes if self.configured else {},
  903. self.loader_cls, self.backend_cls, self.amqp_cls,
  904. self.events_cls, self.log_cls, self.control_cls,
  905. False, self._config_source)
  906. @cached_property
  907. def Worker(self):
  908. """Worker application.
  909. See Also:
  910. :class:`~@Worker`.
  911. """
  912. return self.subclass_with_self('celery.apps.worker:Worker')
  913. @cached_property
  914. def WorkController(self, **kwargs):
  915. """Embeddable worker.
  916. See Also:
  917. :class:`~@WorkController`.
  918. """
  919. return self.subclass_with_self('celery.worker:WorkController')
  920. @cached_property
  921. def Beat(self, **kwargs):
  922. """:program:`celery beat` scheduler application.
  923. See Also:
  924. :class:`~@Beat`.
  925. """
  926. return self.subclass_with_self('celery.apps.beat:Beat')
  927. @cached_property
  928. def Task(self):
  929. """Base task class for this app."""
  930. return self.create_task_cls()
  931. @cached_property
  932. def annotations(self):
  933. return prepare_annotations(self.conf.task_annotations)
  934. @cached_property
  935. def AsyncResult(self):
  936. """Create new result instance.
  937. See Also:
  938. :class:`celery.result.AsyncResult`.
  939. """
  940. return self.subclass_with_self('celery.result:AsyncResult')
  941. @cached_property
  942. def ResultSet(self):
  943. return self.subclass_with_self('celery.result:ResultSet')
  944. @cached_property
  945. def GroupResult(self):
  946. """Create new group result instance.
  947. See Also:
  948. :class:`celery.result.GroupResult`.
  949. """
  950. return self.subclass_with_self('celery.result:GroupResult')
  951. @property
  952. def pool(self):
  953. """Broker connection pool: :class:`~@pool`.
  954. Note:
  955. This attribute is not related to the workers concurrency pool.
  956. """
  957. if self._pool is None:
  958. self._ensure_after_fork()
  959. limit = self.conf.broker_pool_limit
  960. pools.set_limit(limit)
  961. self._pool = pools.connections[self.connection_for_write()]
  962. return self._pool
  963. @property
  964. def current_task(self):
  965. """Instance of task being executed, or :const:`None`."""
  966. return _task_stack.top
  967. @property
  968. def current_worker_task(self):
  969. """The task currently being executed by a worker or :const:`None`.
  970. Differs from :data:`current_task` in that it's not affected
  971. by tasks calling other tasks directly, or eagerly.
  972. """
  973. return get_current_worker_task()
  974. @cached_property
  975. def oid(self):
  976. """Universally unique identifier for this app."""
  977. # since 4.0: thread.get_ident() is not included when
  978. # generating the process id. This is due to how the RPC
  979. # backend now dedicates a single thread to receive results,
  980. # which would not work if each thread has a separate id.
  981. return oid_from(self, threads=False)
  982. @cached_property
  983. def amqp(self):
  984. """AMQP related functionality: :class:`~@amqp`."""
  985. return instantiate(self.amqp_cls, app=self)
  986. @cached_property
  987. def backend(self):
  988. """Current backend instance."""
  989. return self._get_backend()
  990. @property
  991. def conf(self):
  992. """Current configuration."""
  993. if self._conf is None:
  994. self._conf = self._load_config()
  995. return self._conf
  996. @conf.setter
  997. def conf(self, d): # noqa
  998. self._conf = d
  999. @cached_property
  1000. def control(self):
  1001. """Remote control: :class:`~@control`."""
  1002. return instantiate(self.control_cls, app=self)
  1003. @cached_property
  1004. def events(self):
  1005. """Consuming and sending events: :class:`~@events`."""
  1006. return instantiate(self.events_cls, app=self)
  1007. @cached_property
  1008. def loader(self):
  1009. """Current loader instance."""
  1010. return get_loader_cls(self.loader_cls)(app=self)
  1011. @cached_property
  1012. def log(self):
  1013. """Logging: :class:`~@log`."""
  1014. return instantiate(self.log_cls, app=self)
  1015. @cached_property
  1016. def _canvas(self):
  1017. from celery import canvas
  1018. return canvas
  1019. @cached_property
  1020. def tasks(self):
  1021. """Task registry.
  1022. Warning:
  1023. Accessing this attribute will also auto-finalize the app.
  1024. """
  1025. self.finalize(auto=True)
  1026. return self._tasks
  1027. @property
  1028. def producer_pool(self):
  1029. return self.amqp.producer_pool
  1030. def uses_utc_timezone(self):
  1031. """Check if the application uses the UTC timezone."""
  1032. return self.conf.timezone == 'UTC' or self.conf.timezone is None
  1033. @cached_property
  1034. def timezone(self):
  1035. """Current timezone for this app.
  1036. This is a cached property taking the time zone from the
  1037. :setting:`timezone` setting.
  1038. """
  1039. conf = self.conf
  1040. tz = conf.timezone or 'UTC'
  1041. if not tz:
  1042. if conf.enable_utc:
  1043. return timezone.get_timezone('UTC')
  1044. else:
  1045. if not conf.timezone:
  1046. return timezone.local
  1047. return timezone.get_timezone(tz)
  1048. App = Celery # noqa: E305 XXX compat