base.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import threading
  9. import warnings
  10. from collections import Callable, defaultdict, deque
  11. from contextlib import contextmanager
  12. from copy import deepcopy
  13. from functools import wraps
  14. from operator import attrgetter
  15. from billiard.util import register_after_fork
  16. from kombu.clocks import LamportClock
  17. from kombu.utils import cached_property
  18. from celery import platforms
  19. from celery._state import _task_stack, _tls, get_current_app, _register_app
  20. from celery.exceptions import AlwaysEagerIgnored
  21. from celery.five import items, values
  22. from celery.loaders import get_loader_cls
  23. from celery.local import PromiseProxy, maybe_evaluate
  24. from celery.utils.functional import first
  25. from celery.utils.imports import instantiate, symbol_by_name
  26. from celery.utils.objects import mro_lookup
  27. from .annotations import prepare as prepare_annotations
  28. from .builtins import shared_task, load_shared_tasks
  29. from .defaults import DEFAULTS, find_deprecated_settings
  30. from .registry import TaskRegistry
  31. from .utils import (
  32. AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2,
  33. )
  34. DEFAULT_FIXUPS = (
  35. 'celery.fixups.django:DjangoFixup',
  36. )
  37. def app_has_custom(app, attr):
  38. return mro_lookup(app.__class__, attr, stop=(Celery, object),
  39. monkey_patched=[__name__])
  40. def _unpickle_appattr(reverse_name, args):
  41. """Given an attribute name and a list of args, gets
  42. the attribute from the current app and calls it."""
  43. return get_current_app()._rgetattr(reverse_name)(*args)
  44. class Celery(object):
  45. Pickler = AppPickler
  46. SYSTEM = platforms.SYSTEM
  47. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  48. amqp_cls = 'celery.app.amqp:AMQP'
  49. backend_cls = None
  50. events_cls = 'celery.events:Events'
  51. loader_cls = 'celery.loaders.app:AppLoader'
  52. log_cls = 'celery.app.log:Logging'
  53. control_cls = 'celery.app.control:Control'
  54. registry_cls = TaskRegistry
  55. _pool = None
  56. def __init__(self, main=None, loader=None, backend=None,
  57. amqp=None, events=None, log=None, control=None,
  58. set_as_current=True, accept_magic_kwargs=False,
  59. tasks=None, broker=None, include=None, **kwargs):
  60. self.clock = LamportClock()
  61. self.main = main
  62. self.amqp_cls = amqp or self.amqp_cls
  63. self.backend_cls = backend or self.backend_cls
  64. self.events_cls = events or self.events_cls
  65. self.loader_cls = loader or self.loader_cls
  66. self.log_cls = log or self.log_cls
  67. self.control_cls = control or self.control_cls
  68. self.set_as_current = set_as_current
  69. self.registry_cls = symbol_by_name(self.registry_cls)
  70. self.accept_magic_kwargs = accept_magic_kwargs
  71. self.user_options = defaultdict(set)
  72. self.steps = defaultdict(set)
  73. self.configured = False
  74. self._pending_defaults = deque()
  75. self.finalized = False
  76. self._finalize_mutex = threading.Lock()
  77. self._pending = deque()
  78. self._tasks = tasks
  79. if not isinstance(self._tasks, TaskRegistry):
  80. self._tasks = TaskRegistry(self._tasks or {})
  81. # If the class defins a custom __reduce_args__ we need to use
  82. # the old way of pickling apps, which is pickling a list of
  83. # args instead of the new way that pickles a dict of keywords.
  84. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  85. # these options are moved to the config to
  86. # simplify pickling of the app object.
  87. self._preconf = changes or {}
  88. if broker:
  89. self._preconf['BROKER_URL'] = broker
  90. if include:
  91. self._preconf['CELERY_IMPORTS'] = include
  92. self.fixups = list(filter(None, (symbol_by_name(f).include(self)
  93. for f in DEFAULT_FIXUPS)))
  94. if self.set_as_current:
  95. self.set_current()
  96. self.on_init()
  97. _register_app(self)
  98. def set_current(self):
  99. _tls.current_app = self
  100. def __enter__(self):
  101. return self
  102. def __exit__(self, *exc_info):
  103. self.close()
  104. def close(self):
  105. self._maybe_close_pool()
  106. def on_init(self):
  107. """Optional callback called at init."""
  108. pass
  109. def start(self, argv=None):
  110. return instantiate('celery.bin.celery:CeleryCommand', app=self) \
  111. .execute_from_commandline(argv)
  112. def worker_main(self, argv=None):
  113. return instantiate('celery.bin.celeryd:WorkerCommand', app=self) \
  114. .execute_from_commandline(argv)
  115. def task(self, *args, **opts):
  116. """Creates new task class from any callable."""
  117. def inner_create_task_cls(shared=True, filter=None, **opts):
  118. def _create_task_cls(fun):
  119. if shared:
  120. cons = lambda app: app._task_from_fun(fun, **opts)
  121. cons.__name__ = fun.__name__
  122. shared_task(cons)
  123. if self.accept_magic_kwargs: # compat mode
  124. task = self._task_from_fun(fun, **opts)
  125. if filter:
  126. task = filter(task)
  127. return task
  128. # return a proxy object that is only evaluated when first used
  129. promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
  130. self._pending.append(promise)
  131. if filter:
  132. return filter(promise)
  133. return promise
  134. return _create_task_cls
  135. if len(args) == 1 and isinstance(args[0], Callable):
  136. return inner_create_task_cls(**opts)(*args)
  137. return inner_create_task_cls(**opts)
  138. def _task_from_fun(self, fun, **options):
  139. base = options.pop('base', None) or self.Task
  140. T = type(fun.__name__, (base, ), dict({
  141. 'app': self,
  142. 'accept_magic_kwargs': False,
  143. 'run': staticmethod(fun),
  144. '__doc__': fun.__doc__,
  145. '__module__': fun.__module__}, **options))()
  146. task = self._tasks[T.name] # return global instance.
  147. task.bind(self)
  148. return task
  149. def finalize(self):
  150. with self._finalize_mutex:
  151. if not self.finalized:
  152. self.finalized = True
  153. load_shared_tasks(self)
  154. pending = self._pending
  155. while pending:
  156. maybe_evaluate(pending.popleft())
  157. for task in values(self._tasks):
  158. task.bind(self)
  159. def add_defaults(self, fun):
  160. if not isinstance(fun, Callable):
  161. d, fun = fun, lambda: d
  162. if self.configured:
  163. return self.conf.add_defaults(fun())
  164. self._pending_defaults.append(fun)
  165. def config_from_object(self, obj, silent=False):
  166. del(self.conf)
  167. return self.loader.config_from_object(obj, silent=silent)
  168. def config_from_envvar(self, variable_name, silent=False):
  169. del(self.conf)
  170. return self.loader.config_from_envvar(variable_name, silent=silent)
  171. def config_from_cmdline(self, argv, namespace='celery'):
  172. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  173. def autodiscover_tasks(self, packages, related_name='tasks'):
  174. self.loader.autodiscover_tasks(packages, related_name)
  175. def send_task(self, name, args=None, kwargs=None, countdown=None,
  176. eta=None, task_id=None, producer=None, connection=None,
  177. result_cls=None, expires=None, queues=None, publisher=None,
  178. **options):
  179. producer = producer or publisher # XXX compat
  180. if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  181. warnings.warn(AlwaysEagerIgnored(
  182. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  183. result_cls = result_cls or self.AsyncResult
  184. router = self.amqp.Router(queues)
  185. options.setdefault('compression',
  186. self.conf.CELERY_MESSAGE_COMPRESSION)
  187. options = router.route(options, name, args, kwargs)
  188. with self.producer_or_acquire(producer) as producer:
  189. return result_cls(producer.publish_task(name, args, kwargs,
  190. task_id=task_id,
  191. countdown=countdown, eta=eta,
  192. expires=expires, **options))
  193. def connection(self, hostname=None, userid=None, password=None,
  194. virtual_host=None, port=None, ssl=None, connect_timeout=None,
  195. transport=None, transport_options=None, heartbeat=None, **kwargs):
  196. conf = self.conf
  197. return self.amqp.Connection(
  198. hostname or conf.BROKER_HOST,
  199. userid or conf.BROKER_USER,
  200. password or conf.BROKER_PASSWORD,
  201. virtual_host or conf.BROKER_VHOST,
  202. port or conf.BROKER_PORT,
  203. transport=transport or conf.BROKER_TRANSPORT,
  204. ssl=self.either('BROKER_USE_SSL', ssl),
  205. connect_timeout=self.either(
  206. 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
  207. heartbeat=heartbeat,
  208. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  209. **transport_options or {}))
  210. broker_connection = connection
  211. @contextmanager
  212. def connection_or_acquire(self, connection=None, pool=True,
  213. *args, **kwargs):
  214. if connection:
  215. yield connection
  216. else:
  217. if pool:
  218. with self.pool.acquire(block=True) as connection:
  219. yield connection
  220. else:
  221. with self.connection() as connection:
  222. yield connection
  223. default_connection = connection_or_acquire # XXX compat
  224. @contextmanager
  225. def producer_or_acquire(self, producer=None):
  226. if producer:
  227. yield producer
  228. else:
  229. with self.amqp.producer_pool.acquire(block=True) as producer:
  230. yield producer
  231. default_producer = producer_or_acquire # XXX compat
  232. def with_default_connection(self, fun):
  233. """With any function accepting a `connection`
  234. keyword argument, establishes a default connection if one is
  235. not already passed to it.
  236. Any automatically established connection will be closed after
  237. the function returns.
  238. **Deprecated**
  239. Use ``with app.connection_or_acquire(connection)`` instead.
  240. """
  241. @wraps(fun)
  242. def _inner(*args, **kwargs):
  243. connection = kwargs.pop('connection', None)
  244. with self.connection_or_acquire(connection) as c:
  245. return fun(*args, connection=c, **kwargs)
  246. return _inner
  247. def prepare_config(self, c):
  248. """Prepare configuration before it is merged with the defaults."""
  249. return find_deprecated_settings(c)
  250. def now(self):
  251. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  252. def mail_admins(self, subject, body, fail_silently=False):
  253. if self.conf.ADMINS:
  254. to = [admin_email for _, admin_email in self.conf.ADMINS]
  255. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  256. sender=self.conf.SERVER_EMAIL,
  257. host=self.conf.EMAIL_HOST,
  258. port=self.conf.EMAIL_PORT,
  259. user=self.conf.EMAIL_HOST_USER,
  260. password=self.conf.EMAIL_HOST_PASSWORD,
  261. timeout=self.conf.EMAIL_TIMEOUT,
  262. use_ssl=self.conf.EMAIL_USE_SSL,
  263. use_tls=self.conf.EMAIL_USE_TLS)
  264. def select_queues(self, queues=None):
  265. return self.amqp.queues.select_subset(queues)
  266. def either(self, default_key, *values):
  267. """Fallback to the value of a configuration key if none of the
  268. `*values` are true."""
  269. return first(None, values) or self.conf.get(default_key)
  270. def bugreport(self):
  271. return bugreport(self)
  272. def _get_backend(self):
  273. from celery.backends import get_backend_by_url
  274. backend, url = get_backend_by_url(
  275. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  276. self.loader)
  277. return backend(app=self, url=url)
  278. def _get_config(self):
  279. self.configured = True
  280. s = Settings({}, [self.prepare_config(self.loader.conf),
  281. deepcopy(DEFAULTS)])
  282. # load lazy config dict initializers.
  283. pending = self._pending_defaults
  284. while pending:
  285. s.add_defaults(pending.popleft()())
  286. if self._preconf:
  287. for key, value in items(self._preconf):
  288. setattr(s, key, value)
  289. return s
  290. def _after_fork(self, obj_):
  291. self._maybe_close_pool()
  292. def _maybe_close_pool(self):
  293. if self._pool:
  294. self._pool.force_close_all()
  295. self._pool = None
  296. amqp = self.amqp
  297. if amqp._producer_pool:
  298. amqp._producer_pool.force_close_all()
  299. amqp._producer_pool = None
  300. def create_task_cls(self):
  301. """Creates a base task class using default configuration
  302. taken from this app."""
  303. return self.subclass_with_self('celery.app.task:Task', name='Task',
  304. attribute='_app', abstract=True)
  305. def subclass_with_self(self, Class, name=None, attribute='app',
  306. reverse=None, **kw):
  307. """Subclass an app-compatible class by setting its app attribute
  308. to be this app instance.
  309. App-compatible means that the class has a class attribute that
  310. provides the default app it should use, e.g.
  311. ``class Foo: app = None``.
  312. :param Class: The app-compatible class to subclass.
  313. :keyword name: Custom name for the target class.
  314. :keyword attribute: Name of the attribute holding the app,
  315. default is 'app'.
  316. """
  317. Class = symbol_by_name(Class)
  318. reverse = reverse if reverse else Class.__name__
  319. def __reduce__(self):
  320. return _unpickle_appattr, (reverse, self.__reduce_args__())
  321. attrs = dict({attribute: self}, __module__=Class.__module__,
  322. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  323. return type(name or Class.__name__, (Class, ), attrs)
  324. def _rgetattr(self, path):
  325. return attrgetter(path)(self)
  326. def __repr__(self):
  327. return '<{0} {1}:0x{2:x}>'.format(
  328. type(self).__name__, self.main or '__main__', id(self))
  329. def __reduce__(self):
  330. if self._using_v1_reduce:
  331. return self.__reduce_v1__()
  332. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  333. def __reduce_v1__(self):
  334. # Reduce only pickles the configuration changes,
  335. # so the default configuration doesn't have to be passed
  336. # between processes.
  337. return (_unpickle_app, (self.__class__, self.Pickler)
  338. + self.__reduce_args__())
  339. def __reduce_keys__(self):
  340. """Returns keyword arguments used to reconstruct the object
  341. when unpickling."""
  342. return {
  343. 'main': self.main,
  344. 'changes': self.conf.changes,
  345. 'loader': self.loader_cls,
  346. 'backend': self.backend_cls,
  347. 'amqp': self.amqp_cls,
  348. 'events': self.events_cls,
  349. 'log': self.log_cls,
  350. 'control': self.control_cls,
  351. 'accept_magic_kwargs': self.accept_magic_kwargs,
  352. }
  353. def __reduce_args__(self):
  354. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  355. return (self.main, self.conf.changes, self.loader_cls,
  356. self.backend_cls, self.amqp_cls, self.events_cls,
  357. self.log_cls, self.control_cls, self.accept_magic_kwargs)
  358. @cached_property
  359. def Worker(self):
  360. return self.subclass_with_self('celery.apps.worker:Worker')
  361. @cached_property
  362. def WorkController(self, **kwargs):
  363. return self.subclass_with_self('celery.worker:WorkController')
  364. @cached_property
  365. def Beat(self, **kwargs):
  366. return self.subclass_with_self('celery.apps.beat:Beat')
  367. @cached_property
  368. def TaskSet(self):
  369. return self.subclass_with_self('celery.task.sets:TaskSet')
  370. @cached_property
  371. def Task(self):
  372. return self.create_task_cls()
  373. @cached_property
  374. def annotations(self):
  375. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  376. @cached_property
  377. def AsyncResult(self):
  378. return self.subclass_with_self('celery.result:AsyncResult')
  379. @cached_property
  380. def GroupResult(self):
  381. return self.subclass_with_self('celery.result:GroupResult')
  382. @cached_property
  383. def TaskSetResult(self): # XXX compat
  384. return self.subclass_with_self('celery.result:TaskSetResult')
  385. @property
  386. def pool(self):
  387. if self._pool is None:
  388. register_after_fork(self, self._after_fork)
  389. limit = self.conf.BROKER_POOL_LIMIT
  390. self._pool = self.connection().Pool(limit=limit)
  391. return self._pool
  392. @property
  393. def current_task(self):
  394. return _task_stack.top
  395. @cached_property
  396. def amqp(self):
  397. return instantiate(self.amqp_cls, app=self)
  398. @cached_property
  399. def backend(self):
  400. return self._get_backend()
  401. @cached_property
  402. def conf(self):
  403. return self._get_config()
  404. @cached_property
  405. def control(self):
  406. return instantiate(self.control_cls, app=self)
  407. @cached_property
  408. def events(self):
  409. return instantiate(self.events_cls, app=self)
  410. @cached_property
  411. def loader(self):
  412. return get_loader_cls(self.loader_cls)(app=self)
  413. @cached_property
  414. def log(self):
  415. return instantiate(self.log_cls, app=self)
  416. @cached_property
  417. def tasks(self):
  418. self.finalize()
  419. return self._tasks
  420. App = Celery # compat