base.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import threading
  10. import warnings
  11. from collections import Callable, defaultdict, deque
  12. from contextlib import contextmanager
  13. from copy import deepcopy
  14. from operator import attrgetter
  15. from billiard.util import register_after_fork
  16. from kombu.clocks import LamportClock
  17. from kombu.common import oid_from
  18. from kombu.utils import cached_property, uuid
  19. from celery import platforms
  20. from celery._state import (
  21. _task_stack, _tls, get_current_app, _register_app, get_current_worker_task,
  22. )
  23. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  24. from celery.five import items, values
  25. from celery.loaders import get_loader_cls
  26. from celery.local import PromiseProxy, maybe_evaluate
  27. from celery.utils.functional import first, maybe_list
  28. from celery.utils.imports import instantiate, symbol_by_name
  29. from celery.utils.log import ensure_process_aware_logger
  30. from celery.utils.objects import mro_lookup
  31. from .annotations import prepare as prepare_annotations
  32. from .builtins import shared_task, load_shared_tasks
  33. from .defaults import DEFAULTS, find_deprecated_settings
  34. from .registry import TaskRegistry
  35. from .utils import (
  36. AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
  37. )
  38. __all__ = ['Celery']
  39. _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  40. BUILTIN_FIXUPS = frozenset([
  41. 'celery.fixups.django:fixup',
  42. ])
  43. ERR_ENVVAR_NOT_SET = """\
  44. The environment variable {0!r} is not set,
  45. and as such the configuration could not be loaded.
  46. Please set this variable and make it point to
  47. a configuration module."""
  48. def app_has_custom(app, attr):
  49. return mro_lookup(app.__class__, attr, stop=(Celery, object),
  50. monkey_patched=[__name__])
  51. def _unpickle_appattr(reverse_name, args):
  52. """Given an attribute name and a list of args, gets
  53. the attribute from the current app and calls it."""
  54. return get_current_app()._rgetattr(reverse_name)(*args)
  55. class Celery(object):
  56. #: This is deprecated, use :meth:`reduce_keys` instead
  57. Pickler = AppPickler
  58. SYSTEM = platforms.SYSTEM
  59. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  60. amqp_cls = 'celery.app.amqp:AMQP'
  61. backend_cls = None
  62. events_cls = 'celery.events:Events'
  63. loader_cls = 'celery.loaders.app:AppLoader'
  64. log_cls = 'celery.app.log:Logging'
  65. control_cls = 'celery.app.control:Control'
  66. task_cls = 'celery.app.task:Task'
  67. registry_cls = TaskRegistry
  68. _pool = None
  69. def __init__(self, main=None, loader=None, backend=None,
  70. amqp=None, events=None, log=None, control=None,
  71. set_as_current=True, accept_magic_kwargs=False,
  72. tasks=None, broker=None, include=None, changes=None,
  73. config_source=None, fixups=None, task_cls=None, **kwargs):
  74. self.clock = LamportClock()
  75. self.main = main
  76. self.amqp_cls = amqp or self.amqp_cls
  77. self.backend_cls = backend or self.backend_cls
  78. self.events_cls = events or self.events_cls
  79. self.loader_cls = loader or self.loader_cls
  80. self.log_cls = log or self.log_cls
  81. self.control_cls = control or self.control_cls
  82. self.task_cls = task_cls or self.task_cls
  83. self.set_as_current = set_as_current
  84. self.registry_cls = symbol_by_name(self.registry_cls)
  85. self.accept_magic_kwargs = accept_magic_kwargs
  86. self.user_options = defaultdict(set)
  87. self._config_source = config_source
  88. self.steps = defaultdict(set)
  89. self.configured = False
  90. self._pending_defaults = deque()
  91. self.finalized = False
  92. self._finalize_mutex = threading.Lock()
  93. self._pending = deque()
  94. self._tasks = tasks
  95. if not isinstance(self._tasks, TaskRegistry):
  96. self._tasks = TaskRegistry(self._tasks or {})
  97. # If the class defins a custom __reduce_args__ we need to use
  98. # the old way of pickling apps, which is pickling a list of
  99. # args instead of the new way that pickles a dict of keywords.
  100. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  101. # these options are moved to the config to
  102. # simplify pickling of the app object.
  103. self._preconf = changes or {}
  104. if broker:
  105. self._preconf['BROKER_URL'] = broker
  106. if include:
  107. self._preconf['CELERY_IMPORTS'] = include
  108. # Apply fixups.
  109. self.fixups = set(fixups or ())
  110. for fixup in self.fixups | BUILTIN_FIXUPS:
  111. symbol_by_name(fixup)(self)
  112. if self.set_as_current:
  113. self.set_current()
  114. # See Issue #1126
  115. # this is used when pickling the app object so that configuration
  116. # is reread without having to pickle the contents
  117. # (which is often unpickleable anyway)
  118. if self._config_source:
  119. self.config_from_object(self._config_source)
  120. self.on_init()
  121. _register_app(self)
  122. def set_current(self):
  123. _tls.current_app = self
  124. def __enter__(self):
  125. return self
  126. def __exit__(self, *exc_info):
  127. self.close()
  128. def close(self):
  129. self._maybe_close_pool()
  130. def on_init(self):
  131. """Optional callback called at init."""
  132. pass
  133. def start(self, argv=None):
  134. return instantiate(
  135. 'celery.bin.celery:CeleryCommand',
  136. app=self).execute_from_commandline(argv)
  137. def worker_main(self, argv=None):
  138. return instantiate(
  139. 'celery.bin.worker:worker',
  140. app=self).execute_from_commandline(argv)
  141. def task(self, *args, **opts):
  142. """Creates new task class from any callable."""
  143. if _EXECV and not opts.get('_force_evaluate'):
  144. # When using execv the task in the original module will point to a
  145. # different app, so doing things like 'add.request' will point to
  146. # a differnt task instance. This makes sure it will always use
  147. # the task instance from the current app.
  148. # Really need a better solution for this :(
  149. from . import shared_task as proxies_to_curapp
  150. return proxies_to_curapp(*args, _force_evaluate=True, **opts)
  151. def inner_create_task_cls(shared=True, filter=None, **opts):
  152. _filt = filter # stupid 2to3
  153. def _create_task_cls(fun):
  154. if shared:
  155. cons = lambda app: app._task_from_fun(fun, **opts)
  156. cons.__name__ = fun.__name__
  157. shared_task(cons)
  158. if self.accept_magic_kwargs: # compat mode
  159. task = self._task_from_fun(fun, **opts)
  160. if filter:
  161. task = filter(task)
  162. return task
  163. # return a proxy object that is only evaluated when first used
  164. promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
  165. self._pending.append(promise)
  166. if _filt:
  167. return _filt(promise)
  168. return promise
  169. return _create_task_cls
  170. if len(args) == 1 and isinstance(args[0], Callable):
  171. return inner_create_task_cls(**opts)(*args)
  172. if args:
  173. raise TypeError(
  174. 'task() takes no arguments (%s given)' % (len(args, )))
  175. return inner_create_task_cls(**opts)
  176. def _task_from_fun(self, fun, **options):
  177. base = options.pop('base', None) or self.Task
  178. bind = options.pop('bind', False)
  179. T = type(fun.__name__, (base, ), dict({
  180. 'app': self,
  181. 'accept_magic_kwargs': False,
  182. 'run': fun if bind else staticmethod(fun),
  183. '__doc__': fun.__doc__,
  184. '__module__': fun.__module__}, **options))()
  185. task = self._tasks[T.name] # return global instance.
  186. return task
  187. def finalize(self):
  188. with self._finalize_mutex:
  189. if not self.finalized:
  190. self.finalized = True
  191. load_shared_tasks(self)
  192. pending = self._pending
  193. while pending:
  194. maybe_evaluate(pending.popleft())
  195. for task in values(self._tasks):
  196. task.bind(self)
  197. def add_defaults(self, fun):
  198. if not isinstance(fun, Callable):
  199. d, fun = fun, lambda: d
  200. if self.configured:
  201. return self.conf.add_defaults(fun())
  202. self._pending_defaults.append(fun)
  203. def config_from_object(self, obj, silent=False):
  204. del(self.conf)
  205. self._config_source = obj
  206. return self.loader.config_from_object(obj, silent=silent)
  207. def config_from_envvar(self, variable_name, silent=False):
  208. module_name = os.environ.get(variable_name)
  209. if not module_name:
  210. if silent:
  211. return False
  212. raise ImproperlyConfigured(ERR_ENVVAR_NOT_SET.format(module_name))
  213. return self.config_from_object(module_name, silent=silent)
  214. def config_from_cmdline(self, argv, namespace='celery'):
  215. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  216. def setup_security(self, allowed_serializers=None, key=None, cert=None,
  217. store=None, digest='sha1', serializer='json'):
  218. from celery.security import setup_security
  219. return setup_security(allowed_serializers, key, cert,
  220. store, digest, serializer, app=self)
  221. def autodiscover_tasks(self, packages, related_name='tasks'):
  222. if self.conf.CELERY_FORCE_BILLIARD_LOGGING:
  223. # we'll use billiard's processName instead of
  224. # multiprocessing's one in all the loggers
  225. # created after this call
  226. ensure_process_aware_logger()
  227. self.loader.autodiscover_tasks(packages, related_name)
  228. def send_task(self, name, args=None, kwargs=None, countdown=None,
  229. eta=None, task_id=None, producer=None, connection=None,
  230. router=None, result_cls=None, expires=None,
  231. publisher=None, link=None, link_error=None,
  232. add_to_parent=True, reply_to=None, **options):
  233. task_id = task_id or uuid()
  234. producer = producer or publisher # XXX compat
  235. router = router or self.amqp.router
  236. conf = self.conf
  237. if conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  238. warnings.warn(AlwaysEagerIgnored(
  239. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  240. options = router.route(options, name, args, kwargs)
  241. if connection:
  242. producer = self.amqp.TaskProducer(connection)
  243. with self.producer_or_acquire(producer) as P:
  244. self.backend.on_task_call(P, task_id)
  245. task_id = P.publish_task(
  246. name, args, kwargs, countdown=countdown, eta=eta,
  247. task_id=task_id, expires=expires,
  248. callbacks=maybe_list(link), errbacks=maybe_list(link_error),
  249. reply_to=reply_to or self.oid, **options
  250. )
  251. result = (result_cls or self.AsyncResult)(task_id)
  252. if add_to_parent:
  253. parent = get_current_worker_task()
  254. if parent:
  255. parent.add_trail(result)
  256. return result
  257. def connection(self, hostname=None, userid=None, password=None,
  258. virtual_host=None, port=None, ssl=None,
  259. connect_timeout=None, transport=None,
  260. transport_options=None, heartbeat=None, **kwargs):
  261. conf = self.conf
  262. return self.amqp.Connection(
  263. hostname or conf.BROKER_HOST,
  264. userid or conf.BROKER_USER,
  265. password or conf.BROKER_PASSWORD,
  266. virtual_host or conf.BROKER_VHOST,
  267. port or conf.BROKER_PORT,
  268. transport=transport or conf.BROKER_TRANSPORT,
  269. ssl=self.either('BROKER_USE_SSL', ssl),
  270. connect_timeout=self.either(
  271. 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
  272. heartbeat=heartbeat,
  273. login_method=self.either('BROKER_LOGIN_METHOD', None),
  274. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  275. **transport_options or {}))
  276. broker_connection = connection
  277. @contextmanager
  278. def connection_or_acquire(self, connection=None, pool=True,
  279. *args, **kwargs):
  280. if connection:
  281. yield connection
  282. else:
  283. if pool:
  284. with self.pool.acquire(block=True) as connection:
  285. yield connection
  286. else:
  287. with self.connection() as connection:
  288. yield connection
  289. default_connection = connection_or_acquire # XXX compat
  290. @contextmanager
  291. def producer_or_acquire(self, producer=None):
  292. if producer:
  293. yield producer
  294. else:
  295. with self.amqp.producer_pool.acquire(block=True) as producer:
  296. yield producer
  297. default_producer = producer_or_acquire # XXX compat
  298. def prepare_config(self, c):
  299. """Prepare configuration before it is merged with the defaults."""
  300. return find_deprecated_settings(c)
  301. def now(self):
  302. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  303. def mail_admins(self, subject, body, fail_silently=False):
  304. if self.conf.ADMINS:
  305. to = [admin_email for _, admin_email in self.conf.ADMINS]
  306. return self.loader.mail_admins(
  307. subject, body, fail_silently, to=to,
  308. sender=self.conf.SERVER_EMAIL,
  309. host=self.conf.EMAIL_HOST,
  310. port=self.conf.EMAIL_PORT,
  311. user=self.conf.EMAIL_HOST_USER,
  312. password=self.conf.EMAIL_HOST_PASSWORD,
  313. timeout=self.conf.EMAIL_TIMEOUT,
  314. use_ssl=self.conf.EMAIL_USE_SSL,
  315. use_tls=self.conf.EMAIL_USE_TLS,
  316. )
  317. def select_queues(self, queues=None):
  318. return self.amqp.queues.select_subset(queues)
  319. def either(self, default_key, *values):
  320. """Fallback to the value of a configuration key if none of the
  321. `*values` are true."""
  322. return first(None, values) or self.conf.get(default_key)
  323. def bugreport(self):
  324. return bugreport(self)
  325. def _get_backend(self):
  326. from celery.backends import get_backend_by_url
  327. backend, url = get_backend_by_url(
  328. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  329. self.loader)
  330. return backend(app=self, url=url)
  331. def _get_config(self):
  332. self.configured = True
  333. s = Settings({}, [self.prepare_config(self.loader.conf),
  334. deepcopy(DEFAULTS)])
  335. # load lazy config dict initializers.
  336. pending = self._pending_defaults
  337. while pending:
  338. s.add_defaults(pending.popleft()())
  339. if self._preconf:
  340. for key, value in items(self._preconf):
  341. setattr(s, key, value)
  342. return s
  343. def _after_fork(self, obj_):
  344. self._maybe_close_pool()
  345. def _maybe_close_pool(self):
  346. if self._pool:
  347. self._pool.force_close_all()
  348. self._pool = None
  349. amqp = self.amqp
  350. if amqp._producer_pool:
  351. amqp._producer_pool.force_close_all()
  352. amqp._producer_pool = None
  353. def create_task_cls(self):
  354. """Creates a base task class using default configuration
  355. taken from this app."""
  356. return self.subclass_with_self(self.task_cls, name='Task',
  357. attribute='_app', abstract=True)
  358. def subclass_with_self(self, Class, name=None, attribute='app',
  359. reverse=None, **kw):
  360. """Subclass an app-compatible class by setting its app attribute
  361. to be this app instance.
  362. App-compatible means that the class has a class attribute that
  363. provides the default app it should use, e.g.
  364. ``class Foo: app = None``.
  365. :param Class: The app-compatible class to subclass.
  366. :keyword name: Custom name for the target class.
  367. :keyword attribute: Name of the attribute holding the app,
  368. default is 'app'.
  369. """
  370. Class = symbol_by_name(Class)
  371. reverse = reverse if reverse else Class.__name__
  372. def __reduce__(self):
  373. return _unpickle_appattr, (reverse, self.__reduce_args__())
  374. attrs = dict({attribute: self}, __module__=Class.__module__,
  375. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  376. return type(name or Class.__name__, (Class, ), attrs)
  377. def _rgetattr(self, path):
  378. return attrgetter(path)(self)
  379. def __repr__(self):
  380. return '<{0} {1}>'.format(type(self).__name__, appstr(self))
  381. def __reduce__(self):
  382. if self._using_v1_reduce:
  383. return self.__reduce_v1__()
  384. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  385. def __reduce_v1__(self):
  386. # Reduce only pickles the configuration changes,
  387. # so the default configuration doesn't have to be passed
  388. # between processes.
  389. return (
  390. _unpickle_app,
  391. (self.__class__, self.Pickler) + self.__reduce_args__(),
  392. )
  393. def __reduce_keys__(self):
  394. """Returns keyword arguments used to reconstruct the object
  395. when unpickling."""
  396. return {
  397. 'main': self.main,
  398. 'changes': self.conf.changes,
  399. 'loader': self.loader_cls,
  400. 'backend': self.backend_cls,
  401. 'amqp': self.amqp_cls,
  402. 'events': self.events_cls,
  403. 'log': self.log_cls,
  404. 'control': self.control_cls,
  405. 'accept_magic_kwargs': self.accept_magic_kwargs,
  406. 'fixups': self.fixups,
  407. 'config_source': self._config_source,
  408. 'task_cls': self.task_cls,
  409. }
  410. def __reduce_args__(self):
  411. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  412. return (self.main, self.conf.changes,
  413. self.loader_cls, self.backend_cls, self.amqp_cls,
  414. self.events_cls, self.log_cls, self.control_cls,
  415. self.accept_magic_kwargs, self._config_source)
  416. @cached_property
  417. def Worker(self):
  418. return self.subclass_with_self('celery.apps.worker:Worker')
  419. @cached_property
  420. def WorkController(self, **kwargs):
  421. return self.subclass_with_self('celery.worker:WorkController')
  422. @cached_property
  423. def Beat(self, **kwargs):
  424. return self.subclass_with_self('celery.apps.beat:Beat')
  425. @cached_property
  426. def TaskSet(self):
  427. return self.subclass_with_self('celery.task.sets:TaskSet')
  428. @cached_property
  429. def Task(self):
  430. return self.create_task_cls()
  431. @cached_property
  432. def annotations(self):
  433. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  434. @cached_property
  435. def AsyncResult(self):
  436. return self.subclass_with_self('celery.result:AsyncResult')
  437. @cached_property
  438. def GroupResult(self):
  439. return self.subclass_with_self('celery.result:GroupResult')
  440. @cached_property
  441. def TaskSetResult(self): # XXX compat
  442. return self.subclass_with_self('celery.result:TaskSetResult')
  443. @property
  444. def pool(self):
  445. if self._pool is None:
  446. register_after_fork(self, self._after_fork)
  447. limit = self.conf.BROKER_POOL_LIMIT
  448. self._pool = self.connection().Pool(limit=limit)
  449. return self._pool
  450. @property
  451. def current_task(self):
  452. return _task_stack.top
  453. @cached_property
  454. def oid(self):
  455. return oid_from(self)
  456. @cached_property
  457. def amqp(self):
  458. return instantiate(self.amqp_cls, app=self)
  459. @cached_property
  460. def backend(self):
  461. return self._get_backend()
  462. @cached_property
  463. def conf(self):
  464. return self._get_config()
  465. @cached_property
  466. def control(self):
  467. return instantiate(self.control_cls, app=self)
  468. @cached_property
  469. def events(self):
  470. return instantiate(self.events_cls, app=self)
  471. @cached_property
  472. def loader(self):
  473. return get_loader_cls(self.loader_cls)(app=self)
  474. @cached_property
  475. def log(self):
  476. return instantiate(self.log_cls, app=self)
  477. @cached_property
  478. def tasks(self):
  479. self.finalize()
  480. return self._tasks
  481. App = Celery # compat