base.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import threading
  10. import warnings
  11. from collections import defaultdict, deque
  12. from copy import deepcopy
  13. from operator import attrgetter
  14. from amqp import promise
  15. try:
  16. from billiard.util import register_after_fork
  17. except ImportError:
  18. register_after_fork = None
  19. from kombu.clocks import LamportClock
  20. from kombu.common import oid_from
  21. from kombu.utils import cached_property, uuid
  22. from celery import platforms
  23. from celery import signals
  24. from celery._state import (
  25. _task_stack, get_current_app, _set_current_app, set_default_app,
  26. _register_app, get_current_worker_task, connect_on_app_finalize,
  27. _announce_app_finalized,
  28. )
  29. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  30. from celery.five import values
  31. from celery.loaders import get_loader_cls
  32. from celery.local import PromiseProxy, maybe_evaluate
  33. from celery.utils import gen_task_name
  34. from celery.utils.dispatch import Signal
  35. from celery.utils.functional import first, maybe_list, head_from_fun
  36. from celery.utils.imports import instantiate, symbol_by_name
  37. from celery.utils.objects import FallbackContext, mro_lookup
  38. from .annotations import prepare as prepare_annotations
  39. from .defaults import DEFAULTS, find_deprecated_settings
  40. from .registry import TaskRegistry
  41. from .utils import (
  42. AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
  43. )
  44. # Load all builtin tasks
  45. from . import builtins # noqa
  46. __all__ = ['Celery']
  47. _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  48. BUILTIN_FIXUPS = {
  49. 'celery.fixups.django:fixup',
  50. }
  51. ERR_ENVVAR_NOT_SET = """\
  52. The environment variable {0!r} is not set,
  53. and as such the configuration could not be loaded.
  54. Please set this variable and make it point to
  55. a configuration module."""
  56. _after_fork_registered = False
  57. def app_has_custom(app, attr):
  58. return mro_lookup(app.__class__, attr, stop=(Celery, object),
  59. monkey_patched=[__name__])
  60. def _unpickle_appattr(reverse_name, args):
  61. """Given an attribute name and a list of args, gets
  62. the attribute from the current app and calls it."""
  63. return get_current_app()._rgetattr(reverse_name)(*args)
  64. def _global_after_fork(obj):
  65. # Previously every app would call:
  66. # `register_after_fork(app, app._after_fork)`
  67. # but this created a leak as `register_after_fork` stores concrete object
  68. # references and once registered an object cannot be removed without
  69. # touching and iterating over the private afterfork registry list.
  70. #
  71. # See Issue #1949
  72. from celery import _state
  73. from multiprocessing import util as mputil
  74. for app in _state._apps:
  75. try:
  76. app._after_fork(obj)
  77. except Exception as exc:
  78. if mputil._logger:
  79. mputil._logger.info(
  80. 'after forker raised exception: %r', exc, exc_info=1)
  81. def _ensure_after_fork():
  82. global _after_fork_registered
  83. _after_fork_registered = True
  84. if register_after_fork is not None:
  85. register_after_fork(_global_after_fork, _global_after_fork)
  86. class Celery(object):
  87. #: This is deprecated, use :meth:`reduce_keys` instead
  88. Pickler = AppPickler
  89. SYSTEM = platforms.SYSTEM
  90. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  91. amqp_cls = 'celery.app.amqp:AMQP'
  92. backend_cls = None
  93. events_cls = 'celery.events:Events'
  94. loader_cls = 'celery.loaders.app:AppLoader'
  95. log_cls = 'celery.app.log:Logging'
  96. control_cls = 'celery.app.control:Control'
  97. task_cls = 'celery.app.task:Task'
  98. registry_cls = TaskRegistry
  99. _fixups = None
  100. _pool = None
  101. _conf = None
  102. builtin_fixups = BUILTIN_FIXUPS
  103. #: Signal sent when app is loading configuration.
  104. on_configure = None
  105. #: Signal sent after app has prepared the configuration.
  106. on_after_configure = None
  107. #: Signal sent after app has been finalized.
  108. on_after_finalize = None
  109. #: ignored
  110. accept_magic_kwargs = False
  111. def __init__(self, main=None, loader=None, backend=None,
  112. amqp=None, events=None, log=None, control=None,
  113. set_as_current=True, tasks=None, broker=None, include=None,
  114. changes=None, config_source=None, fixups=None, task_cls=None,
  115. autofinalize=True, **kwargs):
  116. self.clock = LamportClock()
  117. self.main = main
  118. self.amqp_cls = amqp or self.amqp_cls
  119. self.events_cls = events or self.events_cls
  120. self.loader_cls = loader or self.loader_cls
  121. self.log_cls = log or self.log_cls
  122. self.control_cls = control or self.control_cls
  123. self.task_cls = task_cls or self.task_cls
  124. self.set_as_current = set_as_current
  125. self.registry_cls = symbol_by_name(self.registry_cls)
  126. self.user_options = defaultdict(set)
  127. self.steps = defaultdict(set)
  128. self.autofinalize = autofinalize
  129. self.configured = False
  130. self._config_source = config_source
  131. self._pending_defaults = deque()
  132. self._pending_periodic_tasks = deque()
  133. self.finalized = False
  134. self._finalize_mutex = threading.Lock()
  135. self._pending = deque()
  136. self._tasks = tasks
  137. if not isinstance(self._tasks, TaskRegistry):
  138. self._tasks = TaskRegistry(self._tasks or {})
  139. # If the class defines a custom __reduce_args__ we need to use
  140. # the old way of pickling apps, which is pickling a list of
  141. # args instead of the new way that pickles a dict of keywords.
  142. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  143. # these options are moved to the config to
  144. # simplify pickling of the app object.
  145. self._preconf = changes or {}
  146. if broker:
  147. self._preconf['BROKER_URL'] = broker
  148. if backend:
  149. self._preconf['CELERY_RESULT_BACKEND'] = backend
  150. if include:
  151. self._preconf['CELERY_IMPORTS'] = include
  152. # - Apply fixups.
  153. self.fixups = set(self.builtin_fixups) if fixups is None else fixups
  154. # ...store fixup instances in _fixups to keep weakrefs alive.
  155. self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
  156. if self.set_as_current:
  157. self.set_current()
  158. # Signals
  159. if self.on_configure is None:
  160. # used to be a method pre 3.2
  161. self.on_configure = Signal()
  162. self.on_after_configure = Signal()
  163. self.on_after_finalize = Signal()
  164. self.on_init()
  165. _register_app(self)
  166. def set_current(self):
  167. _set_current_app(self)
  168. def set_default(self):
  169. set_default_app(self)
  170. def __enter__(self):
  171. return self
  172. def __exit__(self, *exc_info):
  173. self.close()
  174. def close(self):
  175. self._maybe_close_pool()
  176. def on_init(self):
  177. """Optional callback called at init."""
  178. pass
  179. def start(self, argv=None):
  180. return instantiate(
  181. 'celery.bin.celery:CeleryCommand',
  182. app=self).execute_from_commandline(argv)
  183. def worker_main(self, argv=None):
  184. return instantiate(
  185. 'celery.bin.worker:worker',
  186. app=self).execute_from_commandline(argv)
  187. def task(self, *args, **opts):
  188. """Creates new task class from any callable."""
  189. if _EXECV and opts.get('lazy', True):
  190. # When using execv the task in the original module will point to a
  191. # different app, so doing things like 'add.request' will point to
  192. # a different task instance. This makes sure it will always use
  193. # the task instance from the current app.
  194. # Really need a better solution for this :(
  195. from . import shared_task
  196. return shared_task(*args, lazy=False, **opts)
  197. def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts):
  198. _filt = filter # stupid 2to3
  199. def _create_task_cls(fun):
  200. if shared:
  201. cons = lambda app: app._task_from_fun(fun, **opts)
  202. cons.__name__ = fun.__name__
  203. connect_on_app_finalize(cons)
  204. if not lazy or self.finalized:
  205. ret = self._task_from_fun(fun, **opts)
  206. else:
  207. # return a proxy object that evaluates on first use
  208. ret = PromiseProxy(self._task_from_fun, (fun, ), opts,
  209. __doc__=fun.__doc__)
  210. self._pending.append(ret)
  211. if _filt:
  212. return _filt(ret)
  213. return ret
  214. return _create_task_cls
  215. if len(args) == 1:
  216. if callable(args[0]):
  217. return inner_create_task_cls(**opts)(*args)
  218. raise TypeError('argument 1 to @task() must be a callable')
  219. if args:
  220. raise TypeError(
  221. '@task() takes exactly 1 argument ({0} given)'.format(
  222. sum([len(args), len(opts)])))
  223. return inner_create_task_cls(**opts)
  224. def _task_from_fun(self, fun, name=None, base=None, bind=False, **options):
  225. if not self.finalized and not self.autofinalize:
  226. raise RuntimeError('Contract breach: app not finalized')
  227. name = name or self.gen_task_name(fun.__name__, fun.__module__)
  228. base = base or self.Task
  229. if name not in self._tasks:
  230. run = fun if bind else staticmethod(fun)
  231. task = type(fun.__name__, (base, ), dict({
  232. 'app': self,
  233. 'name': name,
  234. 'run': run,
  235. '_decorated': True,
  236. '__doc__': fun.__doc__,
  237. '__module__': fun.__module__,
  238. '__header__': staticmethod(head_from_fun(fun, bound=bind)),
  239. '__wrapped__': run}, **options))()
  240. self._tasks[task.name] = task
  241. task.bind(self) # connects task to this app
  242. else:
  243. task = self._tasks[name]
  244. return task
  245. def gen_task_name(self, name, module):
  246. return gen_task_name(self, name, module)
  247. def finalize(self, auto=False):
  248. with self._finalize_mutex:
  249. if not self.finalized:
  250. if auto and not self.autofinalize:
  251. raise RuntimeError('Contract breach: app not finalized')
  252. self.finalized = True
  253. _announce_app_finalized(self)
  254. pending = self._pending
  255. while pending:
  256. maybe_evaluate(pending.popleft())
  257. for task in values(self._tasks):
  258. task.bind(self)
  259. self.on_after_finalize.send(sender=self)
  260. def add_defaults(self, fun):
  261. if not callable(fun):
  262. d, fun = fun, lambda: d
  263. if self.configured:
  264. return self._conf.add_defaults(fun())
  265. self._pending_defaults.append(fun)
  266. def config_from_object(self, obj, silent=False, force=False):
  267. self._config_source = obj
  268. if force or self.configured:
  269. self._conf = None
  270. return self.loader.config_from_object(obj, silent=silent)
  271. def config_from_envvar(self, variable_name, silent=False, force=False):
  272. module_name = os.environ.get(variable_name)
  273. if not module_name:
  274. if silent:
  275. return False
  276. raise ImproperlyConfigured(
  277. ERR_ENVVAR_NOT_SET.format(variable_name))
  278. return self.config_from_object(module_name, silent=silent, force=force)
  279. def config_from_cmdline(self, argv, namespace='celery'):
  280. (self._conf if self.configured else self.conf).update(
  281. self.loader.cmdline_config_parser(argv, namespace)
  282. )
  283. def setup_security(self, allowed_serializers=None, key=None, cert=None,
  284. store=None, digest='sha1', serializer='json'):
  285. from celery.security import setup_security
  286. return setup_security(allowed_serializers, key, cert,
  287. store, digest, serializer, app=self)
  288. def autodiscover_tasks(self, packages, related_name='tasks', force=False):
  289. if force:
  290. return self._autodiscover_tasks(packages, related_name)
  291. signals.import_modules.connect(promise(
  292. self._autodiscover_tasks, (packages, related_name),
  293. ), weak=False, sender=self)
  294. def _autodiscover_tasks(self, packages, related_name='tasks', **kwargs):
  295. # argument may be lazy
  296. packages = packages() if callable(packages) else packages
  297. self.loader.autodiscover_tasks(packages, related_name)
  298. def send_task(self, name, args=None, kwargs=None, countdown=None,
  299. eta=None, task_id=None, producer=None, connection=None,
  300. router=None, result_cls=None, expires=None,
  301. publisher=None, link=None, link_error=None,
  302. add_to_parent=True, group_id=None, retries=0, chord=None,
  303. reply_to=None, time_limit=None, soft_time_limit=None,
  304. root_id=None, parent_id=None, route_name=None,
  305. shadow=None, **options):
  306. amqp = self.amqp
  307. task_id = task_id or uuid()
  308. producer = producer or publisher # XXX compat
  309. router = router or amqp.router
  310. conf = self.conf
  311. if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  312. warnings.warn(AlwaysEagerIgnored(
  313. 'CELERY_ALWAYS_EAGER has no effect on send_task',
  314. ), stacklevel=2)
  315. options = router.route(options, route_name or name, args, kwargs)
  316. message = amqp.create_task_message(
  317. task_id, name, args, kwargs, countdown, eta, group_id,
  318. expires, retries, chord,
  319. maybe_list(link), maybe_list(link_error),
  320. reply_to or self.oid, time_limit, soft_time_limit,
  321. self.conf.CELERY_SEND_TASK_SENT_EVENT,
  322. root_id, parent_id, shadow,
  323. )
  324. if connection:
  325. producer = amqp.Producer(connection)
  326. with self.producer_or_acquire(producer) as P:
  327. self.backend.on_task_call(P, task_id)
  328. amqp.send_task_message(P, name, message, **options)
  329. result = (result_cls or self.AsyncResult)(task_id)
  330. if add_to_parent:
  331. parent = get_current_worker_task()
  332. if parent:
  333. parent.add_trail(result)
  334. return result
  335. def connection(self, hostname=None, userid=None, password=None,
  336. virtual_host=None, port=None, ssl=None,
  337. connect_timeout=None, transport=None,
  338. transport_options=None, heartbeat=None,
  339. login_method=None, failover_strategy=None, **kwargs):
  340. conf = self.conf
  341. return self.amqp.Connection(
  342. hostname or conf.BROKER_URL,
  343. userid or conf.BROKER_USER,
  344. password or conf.BROKER_PASSWORD,
  345. virtual_host or conf.BROKER_VHOST,
  346. port or conf.BROKER_PORT,
  347. transport=transport or conf.BROKER_TRANSPORT,
  348. ssl=self.either('BROKER_USE_SSL', ssl),
  349. heartbeat=heartbeat,
  350. login_method=login_method or conf.BROKER_LOGIN_METHOD,
  351. failover_strategy=(
  352. failover_strategy or conf.BROKER_FAILOVER_STRATEGY
  353. ),
  354. transport_options=dict(
  355. conf.BROKER_TRANSPORT_OPTIONS, **transport_options or {}
  356. ),
  357. connect_timeout=self.either(
  358. 'BROKER_CONNECTION_TIMEOUT', connect_timeout
  359. ),
  360. )
  361. broker_connection = connection
  362. def _acquire_connection(self, pool=True):
  363. """Helper for :meth:`connection_or_acquire`."""
  364. if pool:
  365. return self.pool.acquire(block=True)
  366. return self.connection()
  367. def connection_or_acquire(self, connection=None, pool=True, *_, **__):
  368. return FallbackContext(connection, self._acquire_connection, pool=pool)
  369. default_connection = connection_or_acquire # XXX compat
  370. def producer_or_acquire(self, producer=None):
  371. return FallbackContext(
  372. producer, self.amqp.producer_pool.acquire, block=True,
  373. )
  374. default_producer = producer_or_acquire # XXX compat
  375. def prepare_config(self, c):
  376. """Prepare configuration before it is merged with the defaults."""
  377. return find_deprecated_settings(c)
  378. def now(self):
  379. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  380. def mail_admins(self, subject, body, fail_silently=False):
  381. conf = self.conf
  382. if conf.ADMINS:
  383. to = [admin_email for _, admin_email in conf.ADMINS]
  384. return self.loader.mail_admins(
  385. subject, body, fail_silently, to=to,
  386. sender=conf.SERVER_EMAIL,
  387. host=conf.EMAIL_HOST,
  388. port=conf.EMAIL_PORT,
  389. user=conf.EMAIL_HOST_USER,
  390. password=conf.EMAIL_HOST_PASSWORD,
  391. timeout=conf.EMAIL_TIMEOUT,
  392. use_ssl=conf.EMAIL_USE_SSL,
  393. use_tls=conf.EMAIL_USE_TLS,
  394. charset=conf.EMAIL_CHARSET,
  395. )
  396. def select_queues(self, queues=None):
  397. return self.amqp.queues.select(queues)
  398. def either(self, default_key, *values):
  399. """Fallback to the value of a configuration key if none of the
  400. `*values` are true."""
  401. return first(None, values) or self.conf.get(default_key)
  402. def bugreport(self):
  403. return bugreport(self)
  404. def _get_backend(self):
  405. from celery.backends import get_backend_by_url
  406. backend, url = get_backend_by_url(
  407. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  408. self.loader)
  409. return backend(app=self, url=url)
  410. def _load_config(self):
  411. if isinstance(self.on_configure, Signal):
  412. self.on_configure.send(sender=self)
  413. else:
  414. # used to be a method pre 3.2
  415. self.on_configure()
  416. if self._config_source:
  417. self.loader.config_from_object(self._config_source)
  418. defaults = dict(deepcopy(DEFAULTS), **self._preconf)
  419. self.configured = True
  420. s = self._conf = Settings(
  421. {}, [self.prepare_config(self.loader.conf), defaults],
  422. )
  423. # load lazy config dict initializers.
  424. pending_def = self._pending_defaults
  425. while pending_def:
  426. s.add_defaults(maybe_evaluate(pending_def.popleft()()))
  427. # load lazy periodic tasks
  428. pending_beat = self._pending_periodic_tasks
  429. while pending_beat:
  430. pargs, pkwargs = pending_beat.popleft()
  431. self._add_periodic_task(*pargs, **pkwargs)
  432. self.on_after_configure.send(sender=self, source=s)
  433. return s
  434. def _after_fork(self, obj_):
  435. self._maybe_close_pool()
  436. def _maybe_close_pool(self):
  437. if self._pool:
  438. self._pool.force_close_all()
  439. self._pool = None
  440. amqp = self.__dict__.get('amqp')
  441. if amqp is not None and amqp._producer_pool is not None:
  442. amqp._producer_pool.force_close_all()
  443. amqp._producer_pool = None
  444. def signature(self, *args, **kwargs):
  445. kwargs['app'] = self
  446. return self.canvas.signature(*args, **kwargs)
  447. def add_periodic_task(self, *args, **kwargs):
  448. if not self.configured:
  449. return self._pending_periodic_tasks.append((args, kwargs))
  450. return self._add_periodic_task(*args, **kwargs)
  451. def _add_periodic_task(self, schedule, sig,
  452. args=(), kwargs={}, name=None, **opts):
  453. from .task import Task
  454. sig = (self.signature(sig.name, args, kwargs)
  455. if isinstance(sig, Task) else sig.clone(args, kwargs))
  456. name = name or ':'.join([sig.name, ','.join(map(str, sig.args))])
  457. self._conf.CELERYBEAT_SCHEDULE[name] = {
  458. 'schedule': schedule,
  459. 'task': sig.name,
  460. 'args': sig.args,
  461. 'kwargs': sig.kwargs,
  462. 'options': dict(sig.options, **opts),
  463. }
  464. def create_task_cls(self):
  465. """Creates a base task class using default configuration
  466. taken from this app."""
  467. return self.subclass_with_self(
  468. self.task_cls, name='Task', attribute='_app',
  469. keep_reduce=True, abstract=True,
  470. )
  471. def subclass_with_self(self, Class, name=None, attribute='app',
  472. reverse=None, keep_reduce=False, **kw):
  473. """Subclass an app-compatible class by setting its app attribute
  474. to be this app instance.
  475. App-compatible means that the class has a class attribute that
  476. provides the default app it should use, e.g.
  477. ``class Foo: app = None``.
  478. :param Class: The app-compatible class to subclass.
  479. :keyword name: Custom name for the target class.
  480. :keyword attribute: Name of the attribute holding the app,
  481. default is 'app'.
  482. """
  483. Class = symbol_by_name(Class)
  484. reverse = reverse if reverse else Class.__name__
  485. def __reduce__(self):
  486. return _unpickle_appattr, (reverse, self.__reduce_args__())
  487. attrs = dict({attribute: self}, __module__=Class.__module__,
  488. __doc__=Class.__doc__, **kw)
  489. if not keep_reduce:
  490. attrs['__reduce__'] = __reduce__
  491. return type(name or Class.__name__, (Class, ), attrs)
  492. def _rgetattr(self, path):
  493. return attrgetter(path)(self)
  494. def __repr__(self):
  495. return '<{0} {1}>'.format(type(self).__name__, appstr(self))
  496. def __reduce__(self):
  497. if self._using_v1_reduce:
  498. return self.__reduce_v1__()
  499. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  500. def __reduce_v1__(self):
  501. # Reduce only pickles the configuration changes,
  502. # so the default configuration doesn't have to be passed
  503. # between processes.
  504. return (
  505. _unpickle_app,
  506. (self.__class__, self.Pickler) + self.__reduce_args__(),
  507. )
  508. def __reduce_keys__(self):
  509. """Return keyword arguments used to reconstruct the object
  510. when unpickling."""
  511. return {
  512. 'main': self.main,
  513. 'changes': self._conf.changes if self._conf else self._preconf,
  514. 'loader': self.loader_cls,
  515. 'backend': self.backend_cls,
  516. 'amqp': self.amqp_cls,
  517. 'events': self.events_cls,
  518. 'log': self.log_cls,
  519. 'control': self.control_cls,
  520. 'fixups': self.fixups,
  521. 'config_source': self._config_source,
  522. 'task_cls': self.task_cls,
  523. }
  524. def __reduce_args__(self):
  525. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  526. return (self.main, self._conf.changes if self._conf else {},
  527. self.loader_cls, self.backend_cls, self.amqp_cls,
  528. self.events_cls, self.log_cls, self.control_cls,
  529. False, self._config_source)
  530. @cached_property
  531. def Worker(self):
  532. return self.subclass_with_self('celery.apps.worker:Worker')
  533. @cached_property
  534. def WorkController(self, **kwargs):
  535. return self.subclass_with_self('celery.worker:WorkController')
  536. @cached_property
  537. def Beat(self, **kwargs):
  538. return self.subclass_with_self('celery.apps.beat:Beat')
  539. @cached_property
  540. def Task(self):
  541. return self.create_task_cls()
  542. @cached_property
  543. def annotations(self):
  544. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  545. @cached_property
  546. def AsyncResult(self):
  547. return self.subclass_with_self('celery.result:AsyncResult')
  548. @cached_property
  549. def ResultSet(self):
  550. return self.subclass_with_self('celery.result:ResultSet')
  551. @cached_property
  552. def GroupResult(self):
  553. return self.subclass_with_self('celery.result:GroupResult')
  554. @cached_property
  555. def TaskSet(self): # XXX compat
  556. """Deprecated! Please use :class:`celery.group` instead."""
  557. return self.subclass_with_self('celery.task.sets:TaskSet')
  558. @cached_property
  559. def TaskSetResult(self): # XXX compat
  560. """Deprecated! Please use :attr:`GroupResult` instead."""
  561. return self.subclass_with_self('celery.result:TaskSetResult')
  562. @property
  563. def pool(self):
  564. if self._pool is None:
  565. _ensure_after_fork()
  566. limit = self.conf.BROKER_POOL_LIMIT
  567. self._pool = self.connection().Pool(limit=limit)
  568. return self._pool
  569. @property
  570. def current_task(self):
  571. return _task_stack.top
  572. @cached_property
  573. def oid(self):
  574. return oid_from(self)
  575. @cached_property
  576. def amqp(self):
  577. return instantiate(self.amqp_cls, app=self)
  578. @cached_property
  579. def backend(self):
  580. return self._get_backend()
  581. @property
  582. def conf(self):
  583. if self._conf is None:
  584. self._load_config()
  585. return self._conf
  586. @conf.setter
  587. def conf(self, d): # noqa
  588. self._conf = d
  589. @cached_property
  590. def control(self):
  591. return instantiate(self.control_cls, app=self)
  592. @cached_property
  593. def events(self):
  594. return instantiate(self.events_cls, app=self)
  595. @cached_property
  596. def loader(self):
  597. return get_loader_cls(self.loader_cls)(app=self)
  598. @cached_property
  599. def log(self):
  600. return instantiate(self.log_cls, app=self)
  601. @cached_property
  602. def canvas(self):
  603. from celery import canvas
  604. return canvas
  605. @cached_property
  606. def tasks(self):
  607. self.finalize(auto=True)
  608. return self._tasks
  609. @cached_property
  610. def timezone(self):
  611. from celery.utils.timeutils import timezone
  612. conf = self.conf
  613. tz = conf.CELERY_TIMEZONE
  614. if not tz:
  615. return (timezone.get_timezone('UTC') if conf.CELERY_ENABLE_UTC
  616. else timezone.local)
  617. return timezone.get_timezone(conf.CELERY_TIMEZONE)
  618. App = Celery # compat