base.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import threading
  10. import warnings
  11. from collections import Callable, defaultdict, deque
  12. from contextlib import contextmanager
  13. from copy import deepcopy
  14. from operator import attrgetter
  15. from billiard.util import register_after_fork
  16. from kombu.clocks import LamportClock
  17. from kombu.common import oid_from
  18. from kombu.utils import cached_property, uuid
  19. from celery import platforms
  20. from celery._state import (
  21. _task_stack, _tls, get_current_app, _register_app, get_current_worker_task,
  22. )
  23. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  24. from celery.five import items, values
  25. from celery.loaders import get_loader_cls
  26. from celery.local import PromiseProxy, maybe_evaluate
  27. from celery.utils.functional import first, maybe_list
  28. from celery.utils.imports import instantiate, symbol_by_name
  29. from celery.utils.log import ensure_process_aware_logger
  30. from celery.utils.objects import mro_lookup
  31. from .annotations import prepare as prepare_annotations
  32. from .builtins import shared_task, load_shared_tasks
  33. from .defaults import DEFAULTS, find_deprecated_settings
  34. from .registry import TaskRegistry
  35. from .utils import (
  36. AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
  37. )
  38. __all__ = ['Celery']
  39. _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  40. BUILTIN_FIXUPS = frozenset([
  41. 'celery.fixups.django:fixup',
  42. ])
  43. ERR_ENVVAR_NOT_SET = """\
  44. The environment variable {0!r} is not set,
  45. and as such the configuration could not be loaded.
  46. Please set this variable and make it point to
  47. a configuration module."""
  48. def app_has_custom(app, attr):
  49. return mro_lookup(app.__class__, attr, stop=(Celery, object),
  50. monkey_patched=[__name__])
  51. def _unpickle_appattr(reverse_name, args):
  52. """Given an attribute name and a list of args, gets
  53. the attribute from the current app and calls it."""
  54. return get_current_app()._rgetattr(reverse_name)(*args)
  55. class Celery(object):
  56. #: This is deprecated, use :meth:`reduce_keys` instead
  57. Pickler = AppPickler
  58. SYSTEM = platforms.SYSTEM
  59. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  60. amqp_cls = 'celery.app.amqp:AMQP'
  61. backend_cls = None
  62. events_cls = 'celery.events:Events'
  63. loader_cls = 'celery.loaders.app:AppLoader'
  64. log_cls = 'celery.app.log:Logging'
  65. control_cls = 'celery.app.control:Control'
  66. task_cls = 'celery.app.task:Task'
  67. registry_cls = TaskRegistry
  68. _pool = None
  69. def __init__(self, main=None, loader=None, backend=None,
  70. amqp=None, events=None, log=None, control=None,
  71. set_as_current=True, accept_magic_kwargs=False,
  72. tasks=None, broker=None, include=None, changes=None,
  73. config_source=None, fixups=None, task_cls=None, **kwargs):
  74. self.clock = LamportClock()
  75. self.main = main
  76. self.amqp_cls = amqp or self.amqp_cls
  77. self.backend_cls = backend or self.backend_cls
  78. self.events_cls = events or self.events_cls
  79. self.loader_cls = loader or self.loader_cls
  80. self.log_cls = log or self.log_cls
  81. self.control_cls = control or self.control_cls
  82. self.task_cls = task_cls or self.task_cls
  83. self.set_as_current = set_as_current
  84. self.registry_cls = symbol_by_name(self.registry_cls)
  85. self.accept_magic_kwargs = accept_magic_kwargs
  86. self.user_options = defaultdict(set)
  87. self._config_source = config_source
  88. self.steps = defaultdict(set)
  89. self.configured = False
  90. self._pending_defaults = deque()
  91. self.finalized = False
  92. self._finalize_mutex = threading.Lock()
  93. self._pending = deque()
  94. self._tasks = tasks
  95. if not isinstance(self._tasks, TaskRegistry):
  96. self._tasks = TaskRegistry(self._tasks or {})
  97. # If the class defins a custom __reduce_args__ we need to use
  98. # the old way of pickling apps, which is pickling a list of
  99. # args instead of the new way that pickles a dict of keywords.
  100. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  101. # these options are moved to the config to
  102. # simplify pickling of the app object.
  103. self._preconf = changes or {}
  104. if broker:
  105. self._preconf['BROKER_URL'] = broker
  106. if include:
  107. self._preconf['CELERY_IMPORTS'] = include
  108. # Apply fixups.
  109. self.fixups = set(fixups or ())
  110. for fixup in self.fixups | BUILTIN_FIXUPS:
  111. symbol_by_name(fixup)(self)
  112. if self.set_as_current:
  113. self.set_current()
  114. # See Issue #1126
  115. # this is used when pickling the app object so that configuration
  116. # is reread without having to pickle the contents
  117. # (which is often unpickleable anyway)
  118. if self._config_source:
  119. self.config_from_object(self._config_source)
  120. self.on_init()
  121. _register_app(self)
  122. def set_current(self):
  123. _tls.current_app = self
  124. def __enter__(self):
  125. return self
  126. def __exit__(self, *exc_info):
  127. self.close()
  128. def close(self):
  129. self._maybe_close_pool()
  130. def on_init(self):
  131. """Optional callback called at init."""
  132. pass
  133. def start(self, argv=None):
  134. return instantiate(
  135. 'celery.bin.celery:CeleryCommand',
  136. app=self).execute_from_commandline(argv)
  137. def worker_main(self, argv=None):
  138. return instantiate(
  139. 'celery.bin.worker:worker',
  140. app=self).execute_from_commandline(argv)
  141. def task(self, *args, **opts):
  142. """Creates new task class from any callable."""
  143. if _EXECV and not opts.get('_force_evaluate'):
  144. # When using execv the task in the original module will point to a
  145. # different app, so doing things like 'add.request' will point to
  146. # a differnt task instance. This makes sure it will always use
  147. # the task instance from the current app.
  148. # Really need a better solution for this :(
  149. from . import shared_task as proxies_to_curapp
  150. return proxies_to_curapp(*args, _force_evaluate=True, **opts)
  151. def inner_create_task_cls(shared=True, filter=None, **opts):
  152. _filt = filter # stupid 2to3
  153. def _create_task_cls(fun):
  154. if shared:
  155. cons = lambda app: app._task_from_fun(fun, **opts)
  156. cons.__name__ = fun.__name__
  157. shared_task(cons)
  158. if self.accept_magic_kwargs: # compat mode
  159. task = self._task_from_fun(fun, **opts)
  160. if filter:
  161. task = filter(task)
  162. return task
  163. if self.finalized or opts.get('_force_evaluate'):
  164. ret = self._task_from_fun(fun, **opts)
  165. else:
  166. # return a proxy object that evaluates on first use
  167. ret = PromiseProxy(self._task_from_fun, (fun, ), opts)
  168. self._pending.append(ret)
  169. if _filt:
  170. return _filt(ret)
  171. return ret
  172. return _create_task_cls
  173. if len(args) == 1 and isinstance(args[0], Callable):
  174. return inner_create_task_cls(**opts)(*args)
  175. if args:
  176. raise TypeError(
  177. 'task() takes no arguments (%s given)' % (len(args, )))
  178. return inner_create_task_cls(**opts)
  179. def _task_from_fun(self, fun, **options):
  180. base = options.pop('base', None) or self.Task
  181. bind = options.pop('bind', False)
  182. T = type(fun.__name__, (base, ), dict({
  183. 'app': self,
  184. 'accept_magic_kwargs': False,
  185. 'run': fun if bind else staticmethod(fun),
  186. '_decorated': True,
  187. '__doc__': fun.__doc__,
  188. '__module__': fun.__module__}, **options))()
  189. task = self._tasks[T.name] # return global instance.
  190. return task
  191. def finalize(self):
  192. with self._finalize_mutex:
  193. if not self.finalized:
  194. self.finalized = True
  195. load_shared_tasks(self)
  196. pending = self._pending
  197. while pending:
  198. maybe_evaluate(pending.popleft())
  199. for task in values(self._tasks):
  200. task.bind(self)
  201. def add_defaults(self, fun):
  202. if not isinstance(fun, Callable):
  203. d, fun = fun, lambda: d
  204. if self.configured:
  205. return self.conf.add_defaults(fun())
  206. self._pending_defaults.append(fun)
  207. def config_from_object(self, obj, silent=False):
  208. del(self.conf)
  209. self._config_source = obj
  210. return self.loader.config_from_object(obj, silent=silent)
  211. def config_from_envvar(self, variable_name, silent=False):
  212. module_name = os.environ.get(variable_name)
  213. if not module_name:
  214. if silent:
  215. return False
  216. raise ImproperlyConfigured(ERR_ENVVAR_NOT_SET.format(module_name))
  217. return self.config_from_object(module_name, silent=silent)
  218. def config_from_cmdline(self, argv, namespace='celery'):
  219. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  220. def setup_security(self, allowed_serializers=None, key=None, cert=None,
  221. store=None, digest='sha1', serializer='json'):
  222. from celery.security import setup_security
  223. return setup_security(allowed_serializers, key, cert,
  224. store, digest, serializer, app=self)
  225. def autodiscover_tasks(self, packages, related_name='tasks'):
  226. if self.conf.CELERY_FORCE_BILLIARD_LOGGING:
  227. # we'll use billiard's processName instead of
  228. # multiprocessing's one in all the loggers
  229. # created after this call
  230. ensure_process_aware_logger()
  231. self.loader.autodiscover_tasks(packages, related_name)
  232. def send_task(self, name, args=None, kwargs=None, countdown=None,
  233. eta=None, task_id=None, producer=None, connection=None,
  234. router=None, result_cls=None, expires=None,
  235. publisher=None, link=None, link_error=None,
  236. add_to_parent=True, reply_to=None, **options):
  237. task_id = task_id or uuid()
  238. producer = producer or publisher # XXX compat
  239. router = router or self.amqp.router
  240. conf = self.conf
  241. if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  242. warnings.warn(AlwaysEagerIgnored(
  243. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  244. options = router.route(options, name, args, kwargs)
  245. if connection:
  246. producer = self.amqp.TaskProducer(connection)
  247. with self.producer_or_acquire(producer) as P:
  248. self.backend.on_task_call(P, task_id)
  249. task_id = P.publish_task(
  250. name, args, kwargs, countdown=countdown, eta=eta,
  251. task_id=task_id, expires=expires,
  252. callbacks=maybe_list(link), errbacks=maybe_list(link_error),
  253. reply_to=reply_to or self.oid, **options
  254. )
  255. result = (result_cls or self.AsyncResult)(task_id)
  256. if add_to_parent:
  257. parent = get_current_worker_task()
  258. if parent:
  259. parent.add_trail(result)
  260. return result
  261. def connection(self, hostname=None, userid=None, password=None,
  262. virtual_host=None, port=None, ssl=None,
  263. connect_timeout=None, transport=None,
  264. transport_options=None, heartbeat=None, **kwargs):
  265. conf = self.conf
  266. return self.amqp.Connection(
  267. hostname or conf.BROKER_HOST,
  268. userid or conf.BROKER_USER,
  269. password or conf.BROKER_PASSWORD,
  270. virtual_host or conf.BROKER_VHOST,
  271. port or conf.BROKER_PORT,
  272. transport=transport or conf.BROKER_TRANSPORT,
  273. ssl=self.either('BROKER_USE_SSL', ssl),
  274. connect_timeout=self.either(
  275. 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
  276. heartbeat=heartbeat,
  277. login_method=self.either('BROKER_LOGIN_METHOD', None),
  278. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  279. **transport_options or {}))
  280. broker_connection = connection
  281. @contextmanager
  282. def connection_or_acquire(self, connection=None, pool=True,
  283. *args, **kwargs):
  284. if connection:
  285. yield connection
  286. else:
  287. if pool:
  288. with self.pool.acquire(block=True) as connection:
  289. yield connection
  290. else:
  291. with self.connection() as connection:
  292. yield connection
  293. default_connection = connection_or_acquire # XXX compat
  294. @contextmanager
  295. def producer_or_acquire(self, producer=None):
  296. if producer:
  297. yield producer
  298. else:
  299. with self.amqp.producer_pool.acquire(block=True) as producer:
  300. yield producer
  301. default_producer = producer_or_acquire # XXX compat
  302. def prepare_config(self, c):
  303. """Prepare configuration before it is merged with the defaults."""
  304. return find_deprecated_settings(c)
  305. def now(self):
  306. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  307. def mail_admins(self, subject, body, fail_silently=False):
  308. if self.conf.ADMINS:
  309. to = [admin_email for _, admin_email in self.conf.ADMINS]
  310. return self.loader.mail_admins(
  311. subject, body, fail_silently, to=to,
  312. sender=self.conf.SERVER_EMAIL,
  313. host=self.conf.EMAIL_HOST,
  314. port=self.conf.EMAIL_PORT,
  315. user=self.conf.EMAIL_HOST_USER,
  316. password=self.conf.EMAIL_HOST_PASSWORD,
  317. timeout=self.conf.EMAIL_TIMEOUT,
  318. use_ssl=self.conf.EMAIL_USE_SSL,
  319. use_tls=self.conf.EMAIL_USE_TLS,
  320. )
  321. def select_queues(self, queues=None):
  322. return self.amqp.queues.select(queues)
  323. def either(self, default_key, *values):
  324. """Fallback to the value of a configuration key if none of the
  325. `*values` are true."""
  326. return first(None, values) or self.conf.get(default_key)
  327. def bugreport(self):
  328. return bugreport(self)
  329. def _get_backend(self):
  330. from celery.backends import get_backend_by_url
  331. backend, url = get_backend_by_url(
  332. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  333. self.loader)
  334. return backend(app=self, url=url)
  335. def on_configure(self):
  336. """Callback calld when the app loads configuration"""
  337. pass
  338. def _get_config(self):
  339. self.on_configure()
  340. self.configured = True
  341. s = Settings({}, [self.prepare_config(self.loader.conf),
  342. deepcopy(DEFAULTS)])
  343. # load lazy config dict initializers.
  344. pending = self._pending_defaults
  345. while pending:
  346. s.add_defaults(maybe_evaluate(pending.popleft()()))
  347. if self._preconf:
  348. for key, value in items(self._preconf):
  349. setattr(s, key, value)
  350. return s
  351. def _after_fork(self, obj_):
  352. self._maybe_close_pool()
  353. def _maybe_close_pool(self):
  354. if self._pool:
  355. self._pool.force_close_all()
  356. self._pool = None
  357. amqp = self.amqp
  358. if amqp._producer_pool:
  359. amqp._producer_pool.force_close_all()
  360. amqp._producer_pool = None
  361. def create_task_cls(self):
  362. """Creates a base task class using default configuration
  363. taken from this app."""
  364. return self.subclass_with_self(
  365. self.task_cls, name='Task', attribute='_app',
  366. keep_reduce=True, abstract=True,
  367. )
  368. def subclass_with_self(self, Class, name=None, attribute='app',
  369. reverse=None, keep_reduce=False, **kw):
  370. """Subclass an app-compatible class by setting its app attribute
  371. to be this app instance.
  372. App-compatible means that the class has a class attribute that
  373. provides the default app it should use, e.g.
  374. ``class Foo: app = None``.
  375. :param Class: The app-compatible class to subclass.
  376. :keyword name: Custom name for the target class.
  377. :keyword attribute: Name of the attribute holding the app,
  378. default is 'app'.
  379. """
  380. Class = symbol_by_name(Class)
  381. reverse = reverse if reverse else Class.__name__
  382. def __reduce__(self):
  383. return _unpickle_appattr, (reverse, self.__reduce_args__())
  384. attrs = dict({attribute: self}, __module__=Class.__module__,
  385. __doc__=Class.__doc__, **kw)
  386. if not keep_reduce:
  387. attrs['__reduce__'] = __reduce__
  388. return type(name or Class.__name__, (Class, ), attrs)
  389. def _rgetattr(self, path):
  390. return attrgetter(path)(self)
  391. def __repr__(self):
  392. return '<{0} {1}>'.format(type(self).__name__, appstr(self))
  393. def __reduce__(self):
  394. if self._using_v1_reduce:
  395. return self.__reduce_v1__()
  396. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  397. def __reduce_v1__(self):
  398. # Reduce only pickles the configuration changes,
  399. # so the default configuration doesn't have to be passed
  400. # between processes.
  401. return (
  402. _unpickle_app,
  403. (self.__class__, self.Pickler) + self.__reduce_args__(),
  404. )
  405. def __reduce_keys__(self):
  406. """Return keyword arguments used to reconstruct the object
  407. when unpickling."""
  408. return {
  409. 'main': self.main,
  410. 'changes': self.conf.changes,
  411. 'loader': self.loader_cls,
  412. 'backend': self.backend_cls,
  413. 'amqp': self.amqp_cls,
  414. 'events': self.events_cls,
  415. 'log': self.log_cls,
  416. 'control': self.control_cls,
  417. 'accept_magic_kwargs': self.accept_magic_kwargs,
  418. 'fixups': self.fixups,
  419. 'config_source': self._config_source,
  420. 'task_cls': self.task_cls,
  421. }
  422. def __reduce_args__(self):
  423. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  424. return (self.main, self.conf.changes,
  425. self.loader_cls, self.backend_cls, self.amqp_cls,
  426. self.events_cls, self.log_cls, self.control_cls,
  427. self.accept_magic_kwargs, self._config_source)
  428. @cached_property
  429. def Worker(self):
  430. return self.subclass_with_self('celery.apps.worker:Worker')
  431. @cached_property
  432. def WorkController(self, **kwargs):
  433. return self.subclass_with_self('celery.worker:WorkController')
  434. @cached_property
  435. def Beat(self, **kwargs):
  436. return self.subclass_with_self('celery.apps.beat:Beat')
  437. @cached_property
  438. def Task(self):
  439. return self.create_task_cls()
  440. @cached_property
  441. def annotations(self):
  442. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  443. @cached_property
  444. def AsyncResult(self):
  445. return self.subclass_with_self('celery.result:AsyncResult')
  446. @cached_property
  447. def ResultSet(self):
  448. return self.subclass_with_self('celery.result:ResultSet')
  449. @cached_property
  450. def GroupResult(self):
  451. return self.subclass_with_self('celery.result:GroupResult')
  452. @cached_property
  453. def TaskSet(self): # XXX compat
  454. """Deprecated! Please use :class:`celery.group` instead."""
  455. return self.subclass_with_self('celery.task.sets:TaskSet')
  456. @cached_property
  457. def TaskSetResult(self): # XXX compat
  458. """Deprecated! Please use :attr:`GroupResult` instead."""
  459. return self.subclass_with_self('celery.result:TaskSetResult')
  460. @property
  461. def pool(self):
  462. if self._pool is None:
  463. register_after_fork(self, self._after_fork)
  464. limit = self.conf.BROKER_POOL_LIMIT
  465. self._pool = self.connection().Pool(limit=limit)
  466. return self._pool
  467. @property
  468. def current_task(self):
  469. return _task_stack.top
  470. @cached_property
  471. def oid(self):
  472. return oid_from(self)
  473. @cached_property
  474. def amqp(self):
  475. return instantiate(self.amqp_cls, app=self)
  476. @cached_property
  477. def backend(self):
  478. return self._get_backend()
  479. @cached_property
  480. def conf(self):
  481. return self._get_config()
  482. @cached_property
  483. def control(self):
  484. return instantiate(self.control_cls, app=self)
  485. @cached_property
  486. def events(self):
  487. return instantiate(self.events_cls, app=self)
  488. @cached_property
  489. def loader(self):
  490. return get_loader_cls(self.loader_cls)(app=self)
  491. @cached_property
  492. def log(self):
  493. return instantiate(self.log_cls, app=self)
  494. @cached_property
  495. def tasks(self):
  496. self.finalize()
  497. return self._tasks
  498. @cached_property
  499. def timezone(self):
  500. from celery.utils.timeutils import timezone
  501. conf = self.conf
  502. tz = conf.CELERY_TIMEZONE
  503. if not tz:
  504. return (timezone.get_timezone('UTC') if conf.CELERY_ENABLE_UTC
  505. else timezone.local)
  506. return timezone.get_timezone(self.conf.CELERY_TIMEZONE)
  507. App = Celery # compat