base.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import threading
  10. import warnings
  11. from collections import defaultdict, deque
  12. from contextlib import contextmanager
  13. from copy import deepcopy
  14. from operator import attrgetter
  15. from amqp import promise
  16. from billiard.util import register_after_fork
  17. from kombu.clocks import LamportClock
  18. from kombu.common import oid_from
  19. from kombu.utils import cached_property, uuid
  20. from celery import platforms
  21. from celery import signals
  22. from celery._state import (
  23. _task_stack, _tls, get_current_app, set_default_app,
  24. _register_app, get_current_worker_task,
  25. )
  26. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  27. from celery.five import items, values
  28. from celery.loaders import get_loader_cls
  29. from celery.local import PromiseProxy, maybe_evaluate
  30. from celery.utils.functional import first, maybe_list
  31. from celery.utils.imports import instantiate, symbol_by_name
  32. from celery.utils.objects import mro_lookup
  33. from .annotations import prepare as prepare_annotations
  34. from .builtins import shared_task, load_shared_tasks
  35. from .defaults import DEFAULTS, find_deprecated_settings
  36. from .registry import TaskRegistry
  37. from .utils import (
  38. AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
  39. )
  40. __all__ = ['Celery']
  41. _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  42. BUILTIN_FIXUPS = frozenset([
  43. 'celery.fixups.django:fixup',
  44. ])
  45. ERR_ENVVAR_NOT_SET = """\
  46. The environment variable {0!r} is not set,
  47. and as such the configuration could not be loaded.
  48. Please set this variable and make it point to
  49. a configuration module."""
  50. def app_has_custom(app, attr):
  51. return mro_lookup(app.__class__, attr, stop=(Celery, object),
  52. monkey_patched=[__name__])
  53. def _unpickle_appattr(reverse_name, args):
  54. """Given an attribute name and a list of args, gets
  55. the attribute from the current app and calls it."""
  56. return get_current_app()._rgetattr(reverse_name)(*args)
  57. class Celery(object):
  58. #: This is deprecated, use :meth:`reduce_keys` instead
  59. Pickler = AppPickler
  60. SYSTEM = platforms.SYSTEM
  61. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  62. amqp_cls = 'celery.app.amqp:AMQP'
  63. backend_cls = None
  64. events_cls = 'celery.events:Events'
  65. loader_cls = 'celery.loaders.app:AppLoader'
  66. log_cls = 'celery.app.log:Logging'
  67. control_cls = 'celery.app.control:Control'
  68. task_cls = 'celery.app.task:Task'
  69. registry_cls = TaskRegistry
  70. _fixups = None
  71. _pool = None
  72. builtin_fixups = BUILTIN_FIXUPS
  73. def __init__(self, main=None, loader=None, backend=None,
  74. amqp=None, events=None, log=None, control=None,
  75. set_as_current=True, accept_magic_kwargs=False,
  76. tasks=None, broker=None, include=None, changes=None,
  77. config_source=None, fixups=None, task_cls=None,
  78. autofinalize=True, **kwargs):
  79. self.clock = LamportClock()
  80. self.main = main
  81. self.amqp_cls = amqp or self.amqp_cls
  82. self.backend_cls = backend or self.backend_cls
  83. self.events_cls = events or self.events_cls
  84. self.loader_cls = loader or self.loader_cls
  85. self.log_cls = log or self.log_cls
  86. self.control_cls = control or self.control_cls
  87. self.task_cls = task_cls or self.task_cls
  88. self.set_as_current = set_as_current
  89. self.registry_cls = symbol_by_name(self.registry_cls)
  90. self.accept_magic_kwargs = accept_magic_kwargs
  91. self.user_options = defaultdict(set)
  92. self.steps = defaultdict(set)
  93. self.autofinalize = autofinalize
  94. self.configured = False
  95. self._config_source = config_source
  96. self._pending_defaults = deque()
  97. self.finalized = False
  98. self._finalize_mutex = threading.Lock()
  99. self._pending = deque()
  100. self._tasks = tasks
  101. if not isinstance(self._tasks, TaskRegistry):
  102. self._tasks = TaskRegistry(self._tasks or {})
  103. # If the class defins a custom __reduce_args__ we need to use
  104. # the old way of pickling apps, which is pickling a list of
  105. # args instead of the new way that pickles a dict of keywords.
  106. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  107. # these options are moved to the config to
  108. # simplify pickling of the app object.
  109. self._preconf = changes or {}
  110. if broker:
  111. self._preconf['BROKER_URL'] = broker
  112. if include:
  113. self._preconf['CELERY_IMPORTS'] = include
  114. # - Apply fixups.
  115. self.fixups = set(self.builtin_fixups) if fixups is None else fixups
  116. # ...store fixup instances in _fixups to keep weakrefs alive.
  117. self._fixups = [symbol_by_name(fixup)(self) for fixup in self.fixups]
  118. if self.set_as_current:
  119. self.set_current()
  120. self.on_init()
  121. _register_app(self)
  122. def set_current(self):
  123. _tls.current_app = self
  124. def set_default(self):
  125. set_default_app(self)
  126. def __enter__(self):
  127. return self
  128. def __exit__(self, *exc_info):
  129. self.close()
  130. def close(self):
  131. self._maybe_close_pool()
  132. def on_init(self):
  133. """Optional callback called at init."""
  134. pass
  135. def start(self, argv=None):
  136. return instantiate(
  137. 'celery.bin.celery:CeleryCommand',
  138. app=self).execute_from_commandline(argv)
  139. def worker_main(self, argv=None):
  140. return instantiate(
  141. 'celery.bin.worker:worker',
  142. app=self).execute_from_commandline(argv)
  143. def task(self, *args, **opts):
  144. """Creates new task class from any callable."""
  145. if _EXECV and not opts.get('_force_evaluate'):
  146. # When using execv the task in the original module will point to a
  147. # different app, so doing things like 'add.request' will point to
  148. # a differnt task instance. This makes sure it will always use
  149. # the task instance from the current app.
  150. # Really need a better solution for this :(
  151. from . import shared_task as proxies_to_curapp
  152. return proxies_to_curapp(*args, _force_evaluate=True, **opts)
  153. def inner_create_task_cls(shared=True, filter=None, **opts):
  154. _filt = filter # stupid 2to3
  155. def _create_task_cls(fun):
  156. if shared:
  157. cons = lambda app: app._task_from_fun(fun, **opts)
  158. cons.__name__ = fun.__name__
  159. shared_task(cons)
  160. if self.accept_magic_kwargs: # compat mode
  161. task = self._task_from_fun(fun, **opts)
  162. if filter:
  163. task = filter(task)
  164. return task
  165. if self.finalized or opts.get('_force_evaluate'):
  166. ret = self._task_from_fun(fun, **opts)
  167. else:
  168. # return a proxy object that evaluates on first use
  169. ret = PromiseProxy(self._task_from_fun, (fun, ), opts,
  170. __doc__=fun.__doc__)
  171. self._pending.append(ret)
  172. if _filt:
  173. return _filt(ret)
  174. return ret
  175. return _create_task_cls
  176. if len(args) == 1:
  177. if callable(args[0]):
  178. return inner_create_task_cls(**opts)(*args)
  179. raise TypeError('argument 1 to @task() must be a callable')
  180. if args:
  181. raise TypeError(
  182. '@task() takes exactly 1 argument ({0} given)'.format(
  183. sum([len(args), len(opts)])))
  184. return inner_create_task_cls(**opts)
  185. def _task_from_fun(self, fun, **options):
  186. if not self.finalized and not self.autofinalize:
  187. raise RuntimeError('Contract breach: app not finalized')
  188. base = options.pop('base', None) or self.Task
  189. bind = options.pop('bind', False)
  190. T = type(fun.__name__, (base, ), dict({
  191. 'app': self,
  192. 'accept_magic_kwargs': False,
  193. 'run': fun if bind else staticmethod(fun),
  194. '_decorated': True,
  195. '__doc__': fun.__doc__,
  196. '__module__': fun.__module__}, **options))()
  197. task = self._tasks[T.name] # return global instance.
  198. return task
  199. def finalize(self, auto=False):
  200. with self._finalize_mutex:
  201. if not self.finalized:
  202. if auto and not self.autofinalize:
  203. raise RuntimeError('Contract breach: app not finalized')
  204. self.finalized = True
  205. load_shared_tasks(self)
  206. pending = self._pending
  207. while pending:
  208. maybe_evaluate(pending.popleft())
  209. for task in values(self._tasks):
  210. task.bind(self)
  211. def add_defaults(self, fun):
  212. if not callable(fun):
  213. d, fun = fun, lambda: d
  214. if self.configured:
  215. return self.conf.add_defaults(fun())
  216. self._pending_defaults.append(fun)
  217. def config_from_object(self, obj, silent=False, force=False):
  218. self._config_source = obj
  219. if force or self.configured:
  220. del(self.conf)
  221. return self.loader.config_from_object(obj, silent=silent)
  222. def config_from_envvar(self, variable_name, silent=False, force=False):
  223. module_name = os.environ.get(variable_name)
  224. if not module_name:
  225. if silent:
  226. return False
  227. raise ImproperlyConfigured(ERR_ENVVAR_NOT_SET.format(module_name))
  228. return self.config_from_object(module_name, silent=silent, force=force)
  229. def config_from_cmdline(self, argv, namespace='celery'):
  230. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  231. def setup_security(self, allowed_serializers=None, key=None, cert=None,
  232. store=None, digest='sha1', serializer='json'):
  233. from celery.security import setup_security
  234. return setup_security(allowed_serializers, key, cert,
  235. store, digest, serializer, app=self)
  236. def autodiscover_tasks(self, packages, related_name='tasks', force=False):
  237. if force:
  238. return self._autodiscover_tasks(packages, related_name)
  239. signals.import_modules.connect(promise(
  240. self._autodiscover_tasks, (packages, related_name),
  241. ), weak=False, sender=self)
  242. def _autodiscover_tasks(self, packages, related_name='tasks', **kwargs):
  243. # argument may be lazy
  244. packages = packages() if callable(packages) else packages
  245. self.loader.autodiscover_tasks(packages, related_name)
  246. def send_task(self, name, args=None, kwargs=None, countdown=None,
  247. eta=None, task_id=None, producer=None, connection=None,
  248. router=None, result_cls=None, expires=None,
  249. publisher=None, link=None, link_error=None,
  250. add_to_parent=True, reply_to=None, **options):
  251. task_id = task_id or uuid()
  252. producer = producer or publisher # XXX compat
  253. router = router or self.amqp.router
  254. conf = self.conf
  255. if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  256. warnings.warn(AlwaysEagerIgnored(
  257. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  258. options = router.route(options, name, args, kwargs)
  259. if connection:
  260. producer = self.amqp.TaskProducer(connection)
  261. with self.producer_or_acquire(producer) as P:
  262. self.backend.on_task_call(P, task_id)
  263. task_id = P.publish_task(
  264. name, args, kwargs, countdown=countdown, eta=eta,
  265. task_id=task_id, expires=expires,
  266. callbacks=maybe_list(link), errbacks=maybe_list(link_error),
  267. reply_to=reply_to or self.oid, **options
  268. )
  269. result = (result_cls or self.AsyncResult)(task_id)
  270. if add_to_parent:
  271. parent = get_current_worker_task()
  272. if parent:
  273. parent.add_trail(result)
  274. return result
  275. def connection(self, hostname=None, userid=None, password=None,
  276. virtual_host=None, port=None, ssl=None,
  277. connect_timeout=None, transport=None,
  278. transport_options=None, heartbeat=None,
  279. login_method=None, failover_strategy=None, **kwargs):
  280. conf = self.conf
  281. return self.amqp.Connection(
  282. hostname or conf.BROKER_URL,
  283. userid or conf.BROKER_USER,
  284. password or conf.BROKER_PASSWORD,
  285. virtual_host or conf.BROKER_VHOST,
  286. port or conf.BROKER_PORT,
  287. transport=transport or conf.BROKER_TRANSPORT,
  288. ssl=self.either('BROKER_USE_SSL', ssl),
  289. heartbeat=heartbeat,
  290. login_method=login_method or conf.BROKER_LOGIN_METHOD,
  291. failover_strategy=(
  292. failover_strategy or conf.BROKER_FAILOVER_STRATEGY
  293. ),
  294. transport_options=dict(
  295. conf.BROKER_TRANSPORT_OPTIONS, **transport_options or {}
  296. ),
  297. connect_timeout=self.either(
  298. 'BROKER_CONNECTION_TIMEOUT', connect_timeout
  299. ),
  300. )
  301. broker_connection = connection
  302. @contextmanager
  303. def connection_or_acquire(self, connection=None, pool=True,
  304. *args, **kwargs):
  305. if connection:
  306. yield connection
  307. else:
  308. if pool:
  309. with self.pool.acquire(block=True) as connection:
  310. yield connection
  311. else:
  312. with self.connection() as connection:
  313. yield connection
  314. default_connection = connection_or_acquire # XXX compat
  315. @contextmanager
  316. def producer_or_acquire(self, producer=None):
  317. if producer:
  318. yield producer
  319. else:
  320. with self.amqp.producer_pool.acquire(block=True) as producer:
  321. yield producer
  322. default_producer = producer_or_acquire # XXX compat
  323. def prepare_config(self, c):
  324. """Prepare configuration before it is merged with the defaults."""
  325. return find_deprecated_settings(c)
  326. def now(self):
  327. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  328. def mail_admins(self, subject, body, fail_silently=False):
  329. if self.conf.ADMINS:
  330. to = [admin_email for _, admin_email in self.conf.ADMINS]
  331. return self.loader.mail_admins(
  332. subject, body, fail_silently, to=to,
  333. sender=self.conf.SERVER_EMAIL,
  334. host=self.conf.EMAIL_HOST,
  335. port=self.conf.EMAIL_PORT,
  336. user=self.conf.EMAIL_HOST_USER,
  337. password=self.conf.EMAIL_HOST_PASSWORD,
  338. timeout=self.conf.EMAIL_TIMEOUT,
  339. use_ssl=self.conf.EMAIL_USE_SSL,
  340. use_tls=self.conf.EMAIL_USE_TLS,
  341. )
  342. def select_queues(self, queues=None):
  343. return self.amqp.queues.select(queues)
  344. def either(self, default_key, *values):
  345. """Fallback to the value of a configuration key if none of the
  346. `*values` are true."""
  347. return first(None, values) or self.conf.get(default_key)
  348. def bugreport(self):
  349. return bugreport(self)
  350. def _get_backend(self):
  351. from celery.backends import get_backend_by_url
  352. backend, url = get_backend_by_url(
  353. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  354. self.loader)
  355. return backend(app=self, url=url)
  356. def on_configure(self):
  357. """Callback calld when the app loads configuration"""
  358. pass
  359. def _get_config(self):
  360. self.on_configure()
  361. if self._config_source:
  362. self.loader.config_from_object(self._config_source)
  363. self.configured = True
  364. s = Settings({}, [self.prepare_config(self.loader.conf),
  365. deepcopy(DEFAULTS)])
  366. # load lazy config dict initializers.
  367. pending = self._pending_defaults
  368. while pending:
  369. s.add_defaults(maybe_evaluate(pending.popleft()()))
  370. if self._preconf:
  371. for key, value in items(self._preconf):
  372. setattr(s, key, value)
  373. return s
  374. def _after_fork(self, obj_):
  375. self._maybe_close_pool()
  376. def _maybe_close_pool(self):
  377. if self._pool:
  378. self._pool.force_close_all()
  379. self._pool = None
  380. amqp = self.amqp
  381. if amqp._producer_pool:
  382. amqp._producer_pool.force_close_all()
  383. amqp._producer_pool = None
  384. def signature(self, *args, **kwargs):
  385. kwargs['app'] = self
  386. return self.canvas.signature(*args, **kwargs)
  387. def create_task_cls(self):
  388. """Creates a base task class using default configuration
  389. taken from this app."""
  390. return self.subclass_with_self(
  391. self.task_cls, name='Task', attribute='_app',
  392. keep_reduce=True, abstract=True,
  393. )
  394. def subclass_with_self(self, Class, name=None, attribute='app',
  395. reverse=None, keep_reduce=False, **kw):
  396. """Subclass an app-compatible class by setting its app attribute
  397. to be this app instance.
  398. App-compatible means that the class has a class attribute that
  399. provides the default app it should use, e.g.
  400. ``class Foo: app = None``.
  401. :param Class: The app-compatible class to subclass.
  402. :keyword name: Custom name for the target class.
  403. :keyword attribute: Name of the attribute holding the app,
  404. default is 'app'.
  405. """
  406. Class = symbol_by_name(Class)
  407. reverse = reverse if reverse else Class.__name__
  408. def __reduce__(self):
  409. return _unpickle_appattr, (reverse, self.__reduce_args__())
  410. attrs = dict({attribute: self}, __module__=Class.__module__,
  411. __doc__=Class.__doc__, **kw)
  412. if not keep_reduce:
  413. attrs['__reduce__'] = __reduce__
  414. return type(name or Class.__name__, (Class, ), attrs)
  415. def _rgetattr(self, path):
  416. return attrgetter(path)(self)
  417. def __repr__(self):
  418. return '<{0} {1}>'.format(type(self).__name__, appstr(self))
  419. def __reduce__(self):
  420. if self._using_v1_reduce:
  421. return self.__reduce_v1__()
  422. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  423. def __reduce_v1__(self):
  424. # Reduce only pickles the configuration changes,
  425. # so the default configuration doesn't have to be passed
  426. # between processes.
  427. return (
  428. _unpickle_app,
  429. (self.__class__, self.Pickler) + self.__reduce_args__(),
  430. )
  431. def __reduce_keys__(self):
  432. """Return keyword arguments used to reconstruct the object
  433. when unpickling."""
  434. return {
  435. 'main': self.main,
  436. 'changes': self.conf.changes,
  437. 'loader': self.loader_cls,
  438. 'backend': self.backend_cls,
  439. 'amqp': self.amqp_cls,
  440. 'events': self.events_cls,
  441. 'log': self.log_cls,
  442. 'control': self.control_cls,
  443. 'accept_magic_kwargs': self.accept_magic_kwargs,
  444. 'fixups': self.fixups,
  445. 'config_source': self._config_source,
  446. 'task_cls': self.task_cls,
  447. }
  448. def __reduce_args__(self):
  449. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  450. return (self.main, self.conf.changes,
  451. self.loader_cls, self.backend_cls, self.amqp_cls,
  452. self.events_cls, self.log_cls, self.control_cls,
  453. self.accept_magic_kwargs, self._config_source)
  454. @cached_property
  455. def Worker(self):
  456. return self.subclass_with_self('celery.apps.worker:Worker')
  457. @cached_property
  458. def WorkController(self, **kwargs):
  459. return self.subclass_with_self('celery.worker:WorkController')
  460. @cached_property
  461. def Beat(self, **kwargs):
  462. return self.subclass_with_self('celery.apps.beat:Beat')
  463. @cached_property
  464. def Task(self):
  465. return self.create_task_cls()
  466. @cached_property
  467. def annotations(self):
  468. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  469. @cached_property
  470. def AsyncResult(self):
  471. return self.subclass_with_self('celery.result:AsyncResult')
  472. @cached_property
  473. def ResultSet(self):
  474. return self.subclass_with_self('celery.result:ResultSet')
  475. @cached_property
  476. def GroupResult(self):
  477. return self.subclass_with_self('celery.result:GroupResult')
  478. @cached_property
  479. def TaskSet(self): # XXX compat
  480. """Deprecated! Please use :class:`celery.group` instead."""
  481. return self.subclass_with_self('celery.task.sets:TaskSet')
  482. @cached_property
  483. def TaskSetResult(self): # XXX compat
  484. """Deprecated! Please use :attr:`GroupResult` instead."""
  485. return self.subclass_with_self('celery.result:TaskSetResult')
  486. @property
  487. def pool(self):
  488. if self._pool is None:
  489. register_after_fork(self, self._after_fork)
  490. limit = self.conf.BROKER_POOL_LIMIT
  491. self._pool = self.connection().Pool(limit=limit)
  492. return self._pool
  493. @property
  494. def current_task(self):
  495. return _task_stack.top
  496. @cached_property
  497. def oid(self):
  498. return oid_from(self)
  499. @cached_property
  500. def amqp(self):
  501. return instantiate(self.amqp_cls, app=self)
  502. @cached_property
  503. def backend(self):
  504. return self._get_backend()
  505. @cached_property
  506. def conf(self):
  507. return self._get_config()
  508. @cached_property
  509. def control(self):
  510. return instantiate(self.control_cls, app=self)
  511. @cached_property
  512. def events(self):
  513. return instantiate(self.events_cls, app=self)
  514. @cached_property
  515. def loader(self):
  516. return get_loader_cls(self.loader_cls)(app=self)
  517. @cached_property
  518. def log(self):
  519. return instantiate(self.log_cls, app=self)
  520. @cached_property
  521. def canvas(self):
  522. from celery import canvas
  523. return canvas
  524. @cached_property
  525. def tasks(self):
  526. self.finalize(auto=True)
  527. return self._tasks
  528. @cached_property
  529. def timezone(self):
  530. from celery.utils.timeutils import timezone
  531. conf = self.conf
  532. tz = conf.CELERY_TIMEZONE
  533. if not tz:
  534. return (timezone.get_timezone('UTC') if conf.CELERY_ENABLE_UTC
  535. else timezone.local)
  536. return timezone.get_timezone(self.conf.CELERY_TIMEZONE)
  537. App = Celery # compat