base.py 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import threading
  10. import warnings
  11. from collections import defaultdict, deque
  12. from copy import deepcopy
  13. from operator import attrgetter
  14. from functools import wraps
  15. from amqp import promise
  16. try:
  17. from billiard.util import register_after_fork
  18. except ImportError:
  19. register_after_fork = None
  20. from kombu.clocks import LamportClock
  21. from kombu.common import oid_from
  22. from kombu.utils import cached_property, uuid
  23. from celery import platforms
  24. from celery import signals
  25. from celery._state import (
  26. _task_stack, get_current_app, _set_current_app, set_default_app,
  27. _register_app, get_current_worker_task, connect_on_app_finalize,
  28. _announce_app_finalized,
  29. )
  30. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  31. from celery.five import items, values
  32. from celery.loaders import get_loader_cls
  33. from celery.local import PromiseProxy, maybe_evaluate
  34. from celery.utils import abstract
  35. from celery.utils import gen_task_name
  36. from celery.utils.dispatch import Signal
  37. from celery.utils.functional import first, maybe_list, head_from_fun
  38. from celery.utils.imports import instantiate, symbol_by_name
  39. from celery.utils.objects import FallbackContext, mro_lookup
  40. from .annotations import prepare as prepare_annotations
  41. from .defaults import DEFAULTS, find_deprecated_settings
  42. from .registry import TaskRegistry
  43. from .utils import (
  44. AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
  45. )
  46. # Load all builtin tasks
  47. from . import builtins # noqa
  48. __all__ = ['Celery']
  49. _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  50. BUILTIN_FIXUPS = {
  51. 'celery.fixups.django:fixup',
  52. }
  53. ERR_ENVVAR_NOT_SET = """\
  54. The environment variable {0!r} is not set,
  55. and as such the configuration could not be loaded.
  56. Please set this variable and make it point to
  57. a configuration module."""
  58. _after_fork_registered = False
  59. def app_has_custom(app, attr):
  60. return mro_lookup(app.__class__, attr, stop=(Celery, object),
  61. monkey_patched=[__name__])
  62. def _unpickle_appattr(reverse_name, args):
  63. """Given an attribute name and a list of args, gets
  64. the attribute from the current app and calls it."""
  65. return get_current_app()._rgetattr(reverse_name)(*args)
  66. def _global_after_fork(obj):
  67. # Previously every app would call:
  68. # `register_after_fork(app, app._after_fork)`
  69. # but this created a leak as `register_after_fork` stores concrete object
  70. # references and once registered an object cannot be removed without
  71. # touching and iterating over the private afterfork registry list.
  72. #
  73. # See Issue #1949
  74. from celery import _state
  75. from multiprocessing import util as mputil
  76. for app in _state._apps:
  77. try:
  78. app._after_fork(obj)
  79. except Exception as exc:
  80. if mputil._logger:
  81. mputil._logger.info(
  82. 'after forker raised exception: %r', exc, exc_info=1)
  83. def _ensure_after_fork():
  84. global _after_fork_registered
  85. _after_fork_registered = True
  86. if register_after_fork is not None:
  87. register_after_fork(_global_after_fork, _global_after_fork)
  88. class Celery(object):
  89. """Celery application.
  90. :param main: Name of the main module if running as `__main__`.
  91. This is used as a prefix for task names.
  92. :keyword broker: URL of the default broker used.
  93. :keyword loader: The loader class, or the name of the loader class to use.
  94. Default is :class:`celery.loaders.app.AppLoader`.
  95. :keyword backend: The result store backend class, or the name of the
  96. backend class to use. Default is the value of the
  97. :setting:`CELERY_RESULT_BACKEND` setting.
  98. :keyword amqp: AMQP object or class name.
  99. :keyword events: Events object or class name.
  100. :keyword log: Log object or class name.
  101. :keyword control: Control object or class name.
  102. :keyword set_as_current: Make this the global current app.
  103. :keyword tasks: A task registry or the name of a registry class.
  104. :keyword include: List of modules every worker should import.
  105. :keyword fixups: List of fixup plug-ins (see e.g.
  106. :mod:`celery.fixups.django`).
  107. :keyword autofinalize: If set to False a :exc:`RuntimeError`
  108. will be raised if the task registry or tasks are used before
  109. the app is finalized.
  110. """
  111. #: This is deprecated, use :meth:`reduce_keys` instead
  112. Pickler = AppPickler
  113. SYSTEM = platforms.SYSTEM
  114. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  115. #: Name of the `__main__` module. Required for standalone scripts.
  116. #:
  117. #: If set this will be used instead of `__main__` when automatically
  118. #: generating task names.
  119. main = None
  120. #: Custom options for command-line programs.
  121. #: See :ref:`extending-commandoptions`
  122. user_options = None
  123. #: Custom bootsteps to extend and modify the worker.
  124. #: See :ref:`extending-bootsteps`.
  125. steps = None
  126. amqp_cls = 'celery.app.amqp:AMQP'
  127. backend_cls = None
  128. events_cls = 'celery.events:Events'
  129. loader_cls = 'celery.loaders.app:AppLoader'
  130. log_cls = 'celery.app.log:Logging'
  131. control_cls = 'celery.app.control:Control'
  132. task_cls = 'celery.app.task:Task'
  133. registry_cls = TaskRegistry
  134. _fixups = None
  135. _pool = None
  136. _conf = None
  137. builtin_fixups = BUILTIN_FIXUPS
  138. #: Signal sent when app is loading configuration.
  139. on_configure = None
  140. #: Signal sent after app has prepared the configuration.
  141. on_after_configure = None
  142. #: Signal sent after app has been finalized.
  143. on_after_finalize = None
  144. #: ignored
  145. accept_magic_kwargs = False
  146. def __init__(self, main=None, loader=None, backend=None,
  147. amqp=None, events=None, log=None, control=None,
  148. set_as_current=True, tasks=None, broker=None, include=None,
  149. changes=None, config_source=None, fixups=None, task_cls=None,
  150. autofinalize=True, **kwargs):
  151. self.clock = LamportClock()
  152. self.main = main
  153. self.amqp_cls = amqp or self.amqp_cls
  154. self.events_cls = events or self.events_cls
  155. self.loader_cls = loader or self.loader_cls
  156. self.log_cls = log or self.log_cls
  157. self.control_cls = control or self.control_cls
  158. self.task_cls = task_cls or self.task_cls
  159. self.set_as_current = set_as_current
  160. self.registry_cls = symbol_by_name(self.registry_cls)
  161. self.user_options = defaultdict(set)
  162. self.steps = defaultdict(set)
  163. self.autofinalize = autofinalize
  164. self.configured = False
  165. self._config_source = config_source
  166. self._pending_defaults = deque()
  167. self._pending_periodic_tasks = deque()
  168. self.finalized = False
  169. self._finalize_mutex = threading.Lock()
  170. self._pending = deque()
  171. self._tasks = tasks
  172. if not isinstance(self._tasks, TaskRegistry):
  173. self._tasks = TaskRegistry(self._tasks or {})
  174. # If the class defines a custom __reduce_args__ we need to use
  175. # the old way of pickling apps, which is pickling a list of
  176. # args instead of the new way that pickles a dict of keywords.
  177. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  178. # these options are moved to the config to
  179. # simplify pickling of the app object.
  180. self._preconf = changes or {}
  181. if broker:
  182. self._preconf['BROKER_URL'] = broker
  183. if backend:
  184. self._preconf['CELERY_RESULT_BACKEND'] = backend
  185. if include:
  186. self._preconf['CELERY_IMPORTS'] = include
  187. # - Apply fixups.
  188. self.fixups = set(self.builtin_fixups) if fixups is None else fixups
  189. # ...store fixup instances in _fixups to keep weakrefs alive.
  190. self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
  191. if self.set_as_current:
  192. self.set_current()
  193. # Signals
  194. if self.on_configure is None:
  195. # used to be a method pre 4.0
  196. self.on_configure = Signal()
  197. self.on_after_configure = Signal()
  198. self.on_after_finalize = Signal()
  199. self.on_init()
  200. _register_app(self)
  201. def set_current(self):
  202. """Makes this the current app for this thread."""
  203. _set_current_app(self)
  204. def set_default(self):
  205. """Makes this the default app for all threads."""
  206. set_default_app(self)
  207. def __enter__(self):
  208. return self
  209. def __exit__(self, *exc_info):
  210. self.close()
  211. def close(self):
  212. """Close any open pool connections and do any other steps necessary
  213. to clean up after the application.
  214. Only necessary for dynamically created apps for which you can
  215. use the with statement instead::
  216. with Celery(set_as_current=False) as app:
  217. with app.connection() as conn:
  218. pass
  219. """
  220. self._maybe_close_pool()
  221. def on_init(self):
  222. """Optional callback called at init."""
  223. pass
  224. def start(self, argv=None):
  225. """Run :program:`celery` using `argv`.
  226. Uses :data:`sys.argv` if `argv` is not specified.
  227. """
  228. return instantiate(
  229. 'celery.bin.celery:CeleryCommand',
  230. app=self).execute_from_commandline(argv)
  231. def worker_main(self, argv=None):
  232. """Run :program:`celery worker` using `argv`.
  233. Uses :data:`sys.argv` if `argv` is not specified.
  234. """
  235. return instantiate(
  236. 'celery.bin.worker:worker',
  237. app=self).execute_from_commandline(argv)
  238. def task(self, *args, **opts):
  239. """Decorator to create a task class out of any callable.
  240. Examples:
  241. .. code-block:: python
  242. @app.task
  243. def refresh_feed(url):
  244. return …
  245. with setting extra options:
  246. .. code-block:: python
  247. @app.task(exchange="feeds")
  248. def refresh_feed(url):
  249. return …
  250. .. admonition:: App Binding
  251. For custom apps the task decorator will return a proxy
  252. object, so that the act of creating the task is not performed
  253. until the task is used or the task registry is accessed.
  254. If you are depending on binding to be deferred, then you must
  255. not access any attributes on the returned object until the
  256. application is fully set up (finalized).
  257. """
  258. if _EXECV and opts.get('lazy', True):
  259. # When using execv the task in the original module will point to a
  260. # different app, so doing things like 'add.request' will point to
  261. # a different task instance. This makes sure it will always use
  262. # the task instance from the current app.
  263. # Really need a better solution for this :(
  264. from . import shared_task
  265. return shared_task(*args, lazy=False, **opts)
  266. def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
  267. _filt = filter # stupid 2to3
  268. def _create_task_cls(fun):
  269. if shared:
  270. def cons(app):
  271. return app._task_from_fun(fun, **opts)
  272. cons.__name__ = fun.__name__
  273. connect_on_app_finalize(cons)
  274. if not lazy or self.finalized:
  275. ret = self._task_from_fun(fun, **opts)
  276. else:
  277. # return a proxy object that evaluates on first use
  278. ret = PromiseProxy(self._task_from_fun, (fun,), opts,
  279. __doc__=fun.__doc__)
  280. self._pending.append(ret)
  281. if _filt:
  282. return _filt(ret)
  283. return ret
  284. return _create_task_cls
  285. if len(args) == 1:
  286. if callable(args[0]):
  287. return inner_create_task_cls(**opts)(*args)
  288. raise TypeError('argument 1 to @task() must be a callable')
  289. if args:
  290. raise TypeError(
  291. '@task() takes exactly 1 argument ({0} given)'.format(
  292. sum([len(args), len(opts)])))
  293. return inner_create_task_cls(**opts)
  294. def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
  295. if not self.finalized and not self.autofinalize:
  296. raise RuntimeError('Contract breach: app not finalized')
  297. name = name or self.gen_task_name(fun.__name__, fun.__module__)
  298. base = base or self.Task
  299. if name not in self._tasks:
  300. run = fun if bind else staticmethod(fun)
  301. task = type(fun.__name__, (base,), dict({
  302. 'app': self,
  303. 'name': name,
  304. 'run': run,
  305. '_decorated': True,
  306. '__doc__': fun.__doc__,
  307. '__module__': fun.__module__,
  308. '__header__': staticmethod(head_from_fun(fun, bound=bind)),
  309. '__wrapped__': run}, **options))()
  310. self._tasks[task.name] = task
  311. task.bind(self) # connects task to this app
  312. autoretry_for = tuple(options.get('autoretry_for', ()))
  313. retry_kwargs = options.get('retry_kwargs', {})
  314. if autoretry_for and not hasattr(task, '_orig_run'):
  315. @wraps(task.run)
  316. def run(*args, **kwargs):
  317. try:
  318. return task._orig_run(*args, **kwargs)
  319. except autoretry_for as exc:
  320. raise task.retry(exc=exc, **retry_kwargs)
  321. task._orig_run, task.run = task.run, run
  322. else:
  323. task = self._tasks[name]
  324. return task
  325. def gen_task_name(self, name, module):
  326. return gen_task_name(self, name, module)
  327. def finalize(self, auto=False):
  328. """Finalizes the app by loading built-in tasks,
  329. and evaluating pending task decorators."""
  330. with self._finalize_mutex:
  331. if not self.finalized:
  332. if auto and not self.autofinalize:
  333. raise RuntimeError('Contract breach: app not finalized')
  334. self.finalized = True
  335. _announce_app_finalized(self)
  336. pending = self._pending
  337. while pending:
  338. maybe_evaluate(pending.popleft())
  339. for task in values(self._tasks):
  340. task.bind(self)
  341. self.on_after_finalize.send(sender=self)
  342. def add_defaults(self, fun):
  343. """Add default configuration from dict ``d``.
  344. If the argument is a callable function then it will be regarded
  345. as a promise, and it won't be loaded until the configuration is
  346. actually needed.
  347. This method can be compared to::
  348. >>> celery.conf.update(d)
  349. with a difference that 1) no copy will be made and 2) the dict will
  350. not be transferred when the worker spawns child processes, so
  351. it's important that the same configuration happens at import time
  352. when pickle restores the object on the other side.
  353. """
  354. if not callable(fun):
  355. d, fun = fun, lambda: d
  356. if self.configured:
  357. return self._conf.add_defaults(fun())
  358. self._pending_defaults.append(fun)
  359. def config_from_object(self, obj, silent=False, force=False):
  360. """Reads configuration from object, where object is either
  361. an object or the name of a module to import.
  362. :keyword silent: If true then import errors will be ignored.
  363. :keyword force: Force reading configuration immediately.
  364. By default the configuration will be read only when required.
  365. .. code-block:: pycon
  366. >>> celery.config_from_object("myapp.celeryconfig")
  367. >>> from myapp import celeryconfig
  368. >>> celery.config_from_object(celeryconfig)
  369. """
  370. self._config_source = obj
  371. if force or self.configured:
  372. self._conf = None
  373. return self.loader.config_from_object(obj, silent=silent)
  374. def config_from_envvar(self, variable_name, silent=False, force=False):
  375. """Read configuration from environment variable.
  376. The value of the environment variable must be the name
  377. of a module to import.
  378. .. code-block:: pycon
  379. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  380. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  381. """
  382. module_name = os.environ.get(variable_name)
  383. if not module_name:
  384. if silent:
  385. return False
  386. raise ImproperlyConfigured(
  387. ERR_ENVVAR_NOT_SET.format(variable_name))
  388. return self.config_from_object(module_name, silent=silent, force=force)
  389. def config_from_cmdline(self, argv, namespace='celery'):
  390. (self._conf if self.configured else self.conf).update(
  391. self.loader.cmdline_config_parser(argv, namespace)
  392. )
  393. def setup_security(self, allowed_serializers=None, key=None, cert=None,
  394. store=None, digest='sha1', serializer='json'):
  395. """Setup the message-signing serializer.
  396. This will affect all application instances (a global operation).
  397. Disables untrusted serializers and if configured to use the ``auth``
  398. serializer will register the auth serializer with the provided settings
  399. into the Kombu serializer registry.
  400. :keyword allowed_serializers: List of serializer names, or
  401. content_types that should be exempt from being disabled.
  402. :keyword key: Name of private key file to use.
  403. Defaults to the :setting:`CELERY_SECURITY_KEY` setting.
  404. :keyword cert: Name of certificate file to use.
  405. Defaults to the :setting:`CELERY_SECURITY_CERTIFICATE` setting.
  406. :keyword store: Directory containing certificates.
  407. Defaults to the :setting:`CELERY_SECURITY_CERT_STORE` setting.
  408. :keyword digest: Digest algorithm used when signing messages.
  409. Default is ``sha1``.
  410. :keyword serializer: Serializer used to encode messages after
  411. they have been signed. See :setting:`CELERY_TASK_SERIALIZER` for
  412. the serializers supported.
  413. Default is ``json``.
  414. """
  415. from celery.security import setup_security
  416. return setup_security(allowed_serializers, key, cert,
  417. store, digest, serializer, app=self)
  418. def autodiscover_tasks(self, packages=None,
  419. related_name='tasks', force=False):
  420. """Try to autodiscover and import modules with a specific name (by
  421. default 'tasks').
  422. If the name is empty, this will be delegated to fixups (e.g. Django).
  423. For example if you have an (imagined) directory tree like this::
  424. foo/__init__.py
  425. tasks.py
  426. models.py
  427. bar/__init__.py
  428. tasks.py
  429. models.py
  430. baz/__init__.py
  431. models.py
  432. Then calling ``app.autodiscover_tasks(['foo', bar', 'baz'])`` will
  433. result in the modules ``foo.tasks`` and ``bar.tasks`` being imported.
  434. :param packages: List of packages to search.
  435. This argument may also be a callable, in which case the
  436. value returned is used (for lazy evaluation).
  437. :keyword related_name: The name of the module to find. Defaults
  438. to "tasks", which means it look for "module.tasks" for every
  439. module in ``packages``.
  440. :keyword force: By default this call is lazy so that the actual
  441. autodiscovery will not happen until an application imports the
  442. default modules. Forcing will cause the autodiscovery to happen
  443. immediately.
  444. """
  445. if force:
  446. return self._autodiscover_tasks(packages, related_name)
  447. signals.import_modules.connect(promise(
  448. self._autodiscover_tasks, (packages, related_name),
  449. ), weak=False, sender=self)
  450. def _autodiscover_tasks(self, packages, related_name, **kwargs):
  451. if packages:
  452. return self._autodiscover_tasks_from_names(packages, related_name)
  453. return self._autodiscover_tasks_from_fixups(related_name)
  454. def _autodiscover_tasks_from_names(self, packages, related_name):
  455. # packages argument can be lazy
  456. return self.loader.autodiscover_tasks(
  457. packages() if callable(packages) else packages, related_name,
  458. )
  459. def _autodiscover_tasks_from_fixups(self, related_name):
  460. return self._autodiscover_tasks_from_names([
  461. pkg for fixup in self._fixups
  462. for pkg in fixup.autodiscover_tasks()
  463. if hasattr(fixup, 'autodiscover_tasks')
  464. ], related_name=related_name)
  465. def send_task(self, name, args=None, kwargs=None, countdown=None,
  466. eta=None, task_id=None, producer=None, connection=None,
  467. router=None, result_cls=None, expires=None,
  468. publisher=None, link=None, link_error=None,
  469. add_to_parent=True, group_id=None, retries=0, chord=None,
  470. reply_to=None, time_limit=None, soft_time_limit=None,
  471. root_id=None, parent_id=None, route_name=None,
  472. shadow=None, **options):
  473. """Send task by name.
  474. :param name: Name of task to call (e.g. `"tasks.add"`).
  475. :keyword result_cls: Specify custom result class. Default is
  476. using :meth:`AsyncResult`.
  477. Otherwise supports the same arguments as :meth:`@-Task.apply_async`.
  478. """
  479. amqp = self.amqp
  480. task_id = task_id or uuid()
  481. producer = producer or publisher # XXX compat
  482. router = router or amqp.router
  483. conf = self.conf
  484. if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  485. warnings.warn(AlwaysEagerIgnored(
  486. 'CELERY_ALWAYS_EAGER has no effect on send_task',
  487. ), stacklevel=2)
  488. options = router.route(options, route_name or name, args, kwargs)
  489. message = amqp.create_task_message(
  490. task_id, name, args, kwargs, countdown, eta, group_id,
  491. expires, retries, chord,
  492. maybe_list(link), maybe_list(link_error),
  493. reply_to or self.oid, time_limit, soft_time_limit,
  494. self.conf.CELERY_SEND_TASK_SENT_EVENT,
  495. root_id, parent_id, shadow,
  496. )
  497. if connection:
  498. producer = amqp.Producer(connection)
  499. with self.producer_or_acquire(producer) as P:
  500. self.backend.on_task_call(P, task_id)
  501. amqp.send_task_message(P, name, message, **options)
  502. result = (result_cls or self.AsyncResult)(task_id)
  503. if add_to_parent:
  504. parent = get_current_worker_task()
  505. if parent:
  506. parent.add_trail(result)
  507. return result
  508. def connection(self, hostname=None, userid=None, password=None,
  509. virtual_host=None, port=None, ssl=None,
  510. connect_timeout=None, transport=None,
  511. transport_options=None, heartbeat=None,
  512. login_method=None, failover_strategy=None, **kwargs):
  513. """Establish a connection to the message broker.
  514. :param url: Either the URL or the hostname of the broker to use.
  515. :keyword hostname: URL, Hostname/IP-address of the broker.
  516. If an URL is used, then the other argument below will
  517. be taken from the URL instead.
  518. :keyword userid: Username to authenticate as.
  519. :keyword password: Password to authenticate with
  520. :keyword virtual_host: Virtual host to use (domain).
  521. :keyword port: Port to connect to.
  522. :keyword ssl: Defaults to the :setting:`BROKER_USE_SSL` setting.
  523. :keyword transport: defaults to the :setting:`BROKER_TRANSPORT`
  524. setting.
  525. :returns :class:`kombu.Connection`:
  526. """
  527. conf = self.conf
  528. return self.amqp.Connection(
  529. hostname or conf.BROKER_URL,
  530. userid or conf.BROKER_USER,
  531. password or conf.BROKER_PASSWORD,
  532. virtual_host or conf.BROKER_VHOST,
  533. port or conf.BROKER_PORT,
  534. transport=transport or conf.BROKER_TRANSPORT,
  535. ssl=self.either('BROKER_USE_SSL', ssl),
  536. heartbeat=heartbeat,
  537. login_method=login_method or conf.BROKER_LOGIN_METHOD,
  538. failover_strategy=(
  539. failover_strategy or conf.BROKER_FAILOVER_STRATEGY
  540. ),
  541. transport_options=dict(
  542. conf.BROKER_TRANSPORT_OPTIONS, **transport_options or {}
  543. ),
  544. connect_timeout=self.either(
  545. 'BROKER_CONNECTION_TIMEOUT', connect_timeout
  546. ),
  547. )
  548. broker_connection = connection
  549. def _acquire_connection(self, pool=True):
  550. """Helper for :meth:`connection_or_acquire`."""
  551. if pool:
  552. return self.pool.acquire(block=True)
  553. return self.connection()
  554. def connection_or_acquire(self, connection=None, pool=True, *_, **__):
  555. """For use within a with-statement to get a connection from the pool
  556. if one is not already provided.
  557. :keyword connection: If not provided, then a connection will be
  558. acquired from the connection pool.
  559. """
  560. return FallbackContext(connection, self._acquire_connection, pool=pool)
  561. default_connection = connection_or_acquire # XXX compat
  562. def producer_or_acquire(self, producer=None):
  563. """For use within a with-statement to get a producer from the pool
  564. if one is not already provided
  565. :keyword producer: If not provided, then a producer will be
  566. acquired from the producer pool.
  567. """
  568. return FallbackContext(
  569. producer, self.amqp.producer_pool.acquire, block=True,
  570. )
  571. default_producer = producer_or_acquire # XXX compat
  572. def prepare_config(self, c):
  573. """Prepare configuration before it is merged with the defaults."""
  574. return find_deprecated_settings(c)
  575. def now(self):
  576. """Return the current time and date as a
  577. :class:`~datetime.datetime` object."""
  578. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  579. def mail_admins(self, subject, body, fail_silently=False):
  580. """Sends an email to the admins in the :setting:`ADMINS` setting."""
  581. conf = self.conf
  582. if conf.ADMINS:
  583. to = [admin_email for _, admin_email in conf.ADMINS]
  584. return self.loader.mail_admins(
  585. subject, body, fail_silently, to=to,
  586. sender=conf.SERVER_EMAIL,
  587. host=conf.EMAIL_HOST,
  588. port=conf.EMAIL_PORT,
  589. user=conf.EMAIL_HOST_USER,
  590. password=conf.EMAIL_HOST_PASSWORD,
  591. timeout=conf.EMAIL_TIMEOUT,
  592. use_ssl=conf.EMAIL_USE_SSL,
  593. use_tls=conf.EMAIL_USE_TLS,
  594. charset=conf.EMAIL_CHARSET,
  595. )
  596. def select_queues(self, queues=None):
  597. """Select a subset of queues, where queues must be a list of queue
  598. names to keep."""
  599. return self.amqp.queues.select(queues)
  600. def either(self, default_key, *values):
  601. """Fallback to the value of a configuration key if none of the
  602. `*values` are true."""
  603. return first(None, values) or self.conf.get(default_key)
  604. def bugreport(self):
  605. """Return a string with information useful for the Celery core
  606. developers when reporting a bug."""
  607. return bugreport(self)
  608. def _get_backend(self):
  609. from celery.backends import get_backend_by_url
  610. backend, url = get_backend_by_url(
  611. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  612. self.loader)
  613. return backend(app=self, url=url)
  614. def _load_config(self):
  615. if isinstance(self.on_configure, Signal):
  616. self.on_configure.send(sender=self)
  617. else:
  618. # used to be a method pre 4.0
  619. self.on_configure()
  620. if self._config_source:
  621. self.loader.config_from_object(self._config_source)
  622. defaults = dict(deepcopy(DEFAULTS), **self._preconf)
  623. self.configured = True
  624. s = self._conf = Settings(
  625. {}, [self.prepare_config(self.loader.conf), defaults],
  626. )
  627. # load lazy config dict initializers.
  628. pending_def = self._pending_defaults
  629. while pending_def:
  630. s.add_defaults(maybe_evaluate(pending_def.popleft()()))
  631. # load lazy periodic tasks
  632. pending_beat = self._pending_periodic_tasks
  633. while pending_beat:
  634. self._add_periodic_task(*pending_beat.popleft())
  635. # Settings.__setitem__ method, set Settings.change
  636. if self._preconf:
  637. for key, value in items(self._preconf):
  638. setattr(s, key, value)
  639. self.on_after_configure.send(sender=self, source=s)
  640. return s
  641. def _after_fork(self, obj_):
  642. self._maybe_close_pool()
  643. def _maybe_close_pool(self):
  644. if self._pool:
  645. self._pool.force_close_all()
  646. self._pool = None
  647. amqp = self.__dict__.get('amqp')
  648. if amqp is not None and amqp._producer_pool is not None:
  649. amqp._producer_pool.force_close_all()
  650. amqp._producer_pool = None
  651. def signature(self, *args, **kwargs):
  652. """Return a new :class:`~celery.canvas.Signature` bound to this app.
  653. See :meth:`~celery.signature`
  654. """
  655. kwargs['app'] = self
  656. return self.canvas.signature(*args, **kwargs)
  657. def add_periodic_task(self, schedule, sig,
  658. args=(), kwargs=(), name=None, **opts):
  659. key, entry = self._sig_to_periodic_task_entry(
  660. schedule, sig, args, kwargs, name, **opts)
  661. if self.configured:
  662. self._add_periodic_task(key, entry)
  663. else:
  664. self._pending_periodic_tasks.append((key, entry))
  665. return key
  666. def _sig_to_periodic_task_entry(self, schedule, sig,
  667. args=(), kwargs={}, name=None, **opts):
  668. sig = (sig.clone(args, kwargs)
  669. if isinstance(sig, abstract.CallableSignature)
  670. else self.signature(sig.name, args, kwargs))
  671. return name or repr(sig), {
  672. 'schedule': schedule,
  673. 'task': sig.name,
  674. 'args': sig.args,
  675. 'kwargs': sig.kwargs,
  676. 'options': dict(sig.options, **opts),
  677. }
  678. def _add_periodic_task(self, key, entry):
  679. self._conf.CELERYBEAT_SCHEDULE[key] = entry
  680. def create_task_cls(self):
  681. """Creates a base task class using default configuration
  682. taken from this app."""
  683. return self.subclass_with_self(
  684. self.task_cls, name='Task', attribute='_app',
  685. keep_reduce=True, abstract=True,
  686. )
  687. def subclass_with_self(self, Class, name=None, attribute='app',
  688. reverse=None, keep_reduce=False, **kw):
  689. """Subclass an app-compatible class by setting its app attribute
  690. to be this app instance.
  691. App-compatible means that the class has a class attribute that
  692. provides the default app it should use, e.g.
  693. ``class Foo: app = None``.
  694. :param Class: The app-compatible class to subclass.
  695. :keyword name: Custom name for the target class.
  696. :keyword attribute: Name of the attribute holding the app,
  697. default is 'app'.
  698. """
  699. Class = symbol_by_name(Class)
  700. reverse = reverse if reverse else Class.__name__
  701. def __reduce__(self):
  702. return _unpickle_appattr, (reverse, self.__reduce_args__())
  703. attrs = dict({attribute: self}, __module__=Class.__module__,
  704. __doc__=Class.__doc__, **kw)
  705. if not keep_reduce:
  706. attrs['__reduce__'] = __reduce__
  707. return type(name or Class.__name__, (Class,), attrs)
  708. def _rgetattr(self, path):
  709. return attrgetter(path)(self)
  710. def __repr__(self):
  711. return '<{0} {1}>'.format(type(self).__name__, appstr(self))
  712. def __reduce__(self):
  713. if self._using_v1_reduce:
  714. return self.__reduce_v1__()
  715. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  716. def __reduce_v1__(self):
  717. # Reduce only pickles the configuration changes,
  718. # so the default configuration doesn't have to be passed
  719. # between processes.
  720. return (
  721. _unpickle_app,
  722. (self.__class__, self.Pickler) + self.__reduce_args__(),
  723. )
  724. def __reduce_keys__(self):
  725. """Return keyword arguments used to reconstruct the object
  726. when unpickling."""
  727. return {
  728. 'main': self.main,
  729. 'changes': self._conf.changes if self._conf else self._preconf,
  730. 'loader': self.loader_cls,
  731. 'backend': self.backend_cls,
  732. 'amqp': self.amqp_cls,
  733. 'events': self.events_cls,
  734. 'log': self.log_cls,
  735. 'control': self.control_cls,
  736. 'fixups': self.fixups,
  737. 'config_source': self._config_source,
  738. 'task_cls': self.task_cls,
  739. }
  740. def __reduce_args__(self):
  741. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  742. return (self.main, self._conf.changes if self._conf else {},
  743. self.loader_cls, self.backend_cls, self.amqp_cls,
  744. self.events_cls, self.log_cls, self.control_cls,
  745. False, self._config_source)
  746. @cached_property
  747. def Worker(self):
  748. """Worker application. See :class:`~@Worker`."""
  749. return self.subclass_with_self('celery.apps.worker:Worker')
  750. @cached_property
  751. def WorkController(self, **kwargs):
  752. """Embeddable worker. See :class:`~@WorkController`."""
  753. return self.subclass_with_self('celery.worker:WorkController')
  754. @cached_property
  755. def Beat(self, **kwargs):
  756. """Celerybeat scheduler application.
  757. See :class:`~@Beat`.
  758. """
  759. return self.subclass_with_self('celery.apps.beat:Beat')
  760. @cached_property
  761. def Task(self):
  762. """Base task class for this app."""
  763. return self.create_task_cls()
  764. @cached_property
  765. def annotations(self):
  766. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  767. @cached_property
  768. def AsyncResult(self):
  769. """Create new result instance.
  770. See :class:`celery.result.AsyncResult`.
  771. """
  772. return self.subclass_with_self('celery.result:AsyncResult')
  773. @cached_property
  774. def ResultSet(self):
  775. return self.subclass_with_self('celery.result:ResultSet')
  776. @cached_property
  777. def GroupResult(self):
  778. """Create new group result instance.
  779. See :class:`celery.result.GroupResult`.
  780. """
  781. return self.subclass_with_self('celery.result:GroupResult')
  782. @cached_property
  783. def TaskSet(self): # XXX compat
  784. """Deprecated! Please use :class:`celery.group` instead."""
  785. return self.subclass_with_self('celery.task.sets:TaskSet')
  786. @cached_property
  787. def TaskSetResult(self): # XXX compat
  788. """Deprecated! Please use :attr:`GroupResult` instead."""
  789. return self.subclass_with_self('celery.result:TaskSetResult')
  790. @property
  791. def pool(self):
  792. """Broker connection pool: :class:`~@pool`.
  793. This attribute is not related to the workers concurrency pool.
  794. """
  795. if self._pool is None:
  796. _ensure_after_fork()
  797. limit = self.conf.BROKER_POOL_LIMIT
  798. self._pool = self.connection().Pool(limit=limit)
  799. return self._pool
  800. @property
  801. def current_task(self):
  802. """The instance of the task that is being executed, or
  803. :const:`None`."""
  804. return _task_stack.top
  805. @cached_property
  806. def oid(self):
  807. return oid_from(self)
  808. @cached_property
  809. def amqp(self):
  810. """AMQP related functionality: :class:`~@amqp`."""
  811. return instantiate(self.amqp_cls, app=self)
  812. @cached_property
  813. def backend(self):
  814. """Current backend instance."""
  815. return self._get_backend()
  816. @property
  817. def conf(self):
  818. """Current configuration."""
  819. if self._conf is None:
  820. self._load_config()
  821. return self._conf
  822. @conf.setter
  823. def conf(self, d): # noqa
  824. self._conf = d
  825. @cached_property
  826. def control(self):
  827. """Remote control: :class:`~@control`."""
  828. return instantiate(self.control_cls, app=self)
  829. @cached_property
  830. def events(self):
  831. """Consuming and sending events: :class:`~@events`."""
  832. return instantiate(self.events_cls, app=self)
  833. @cached_property
  834. def loader(self):
  835. """Current loader instance."""
  836. return get_loader_cls(self.loader_cls)(app=self)
  837. @cached_property
  838. def log(self):
  839. """Logging: :class:`~@log`."""
  840. return instantiate(self.log_cls, app=self)
  841. @cached_property
  842. def canvas(self):
  843. from celery import canvas
  844. return canvas
  845. @cached_property
  846. def tasks(self):
  847. """Task registry.
  848. Accessing this attribute will also finalize the app.
  849. """
  850. self.finalize(auto=True)
  851. return self._tasks
  852. @cached_property
  853. def timezone(self):
  854. """Current timezone for this app.
  855. This is a cached property taking the time zone from the
  856. :setting:`CELERY_TIMEZONE` setting.
  857. """
  858. from celery.utils.timeutils import timezone
  859. conf = self.conf
  860. tz = conf.CELERY_TIMEZONE
  861. if not tz:
  862. return (timezone.get_timezone('UTC') if conf.CELERY_ENABLE_UTC
  863. else timezone.local)
  864. return timezone.get_timezone(conf.CELERY_TIMEZONE)
  865. App = Celery # compat