base.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. import threading
  10. import warnings
  11. from collections import Callable, defaultdict, deque
  12. from contextlib import contextmanager
  13. from copy import deepcopy
  14. from operator import attrgetter
  15. from billiard.util import register_after_fork
  16. from kombu.clocks import LamportClock
  17. from kombu.common import oid_from
  18. from kombu.serialization import enable_insecure_serializers
  19. from kombu.utils import cached_property
  20. from celery import platforms
  21. from celery._state import _task_stack, _tls, get_current_app, _register_app
  22. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  23. from celery.five import items, values
  24. from celery.loaders import get_loader_cls
  25. from celery.local import PromiseProxy, maybe_evaluate
  26. from celery.utils.functional import first
  27. from celery.utils.imports import instantiate, symbol_by_name
  28. from celery.utils.log import ensure_process_aware_logger
  29. from celery.utils.objects import mro_lookup
  30. from .annotations import prepare as prepare_annotations
  31. from .builtins import shared_task, load_shared_tasks
  32. from .defaults import DEFAULTS, find_deprecated_settings
  33. from .registry import TaskRegistry
  34. from .utils import (
  35. AppPickler, Settings, bugreport, _unpickle_app, _unpickle_app_v2, appstr,
  36. )
  37. _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  38. BUILTIN_FIXUPS = frozenset([
  39. 'celery.fixups.django:fixup',
  40. ])
  41. ERR_ENVVAR_NOT_SET = """\
  42. The environment variable {0!r} is not set,
  43. and as such the configuration could not be loaded.
  44. Please set this variable and make it point to
  45. a configuration module."""
  46. def app_has_custom(app, attr):
  47. return mro_lookup(app.__class__, attr, stop=(Celery, object),
  48. monkey_patched=[__name__])
  49. def _unpickle_appattr(reverse_name, args):
  50. """Given an attribute name and a list of args, gets
  51. the attribute from the current app and calls it."""
  52. return get_current_app()._rgetattr(reverse_name)(*args)
  53. class Celery(object):
  54. #: This is deprecated, use :meth:`reduce_keys` instead
  55. Pickler = AppPickler
  56. SYSTEM = platforms.SYSTEM
  57. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  58. amqp_cls = 'celery.app.amqp:AMQP'
  59. backend_cls = None
  60. events_cls = 'celery.events:Events'
  61. loader_cls = 'celery.loaders.app:AppLoader'
  62. log_cls = 'celery.app.log:Logging'
  63. control_cls = 'celery.app.control:Control'
  64. registry_cls = TaskRegistry
  65. _pool = None
  66. def __init__(self, main=None, loader=None, backend=None,
  67. amqp=None, events=None, log=None, control=None,
  68. set_as_current=True, accept_magic_kwargs=False,
  69. tasks=None, broker=None, include=None, changes=None,
  70. config_source=None, fixups=None, **kwargs):
  71. self.clock = LamportClock()
  72. self.main = main
  73. self.amqp_cls = amqp or self.amqp_cls
  74. self.backend_cls = backend or self.backend_cls
  75. self.events_cls = events or self.events_cls
  76. self.loader_cls = loader or self.loader_cls
  77. self.log_cls = log or self.log_cls
  78. self.control_cls = control or self.control_cls
  79. self.set_as_current = set_as_current
  80. self.registry_cls = symbol_by_name(self.registry_cls)
  81. self.accept_magic_kwargs = accept_magic_kwargs
  82. self.user_options = defaultdict(set)
  83. self._config_source = config_source
  84. self.steps = defaultdict(set)
  85. self.configured = False
  86. self._pending_defaults = deque()
  87. self.finalized = False
  88. self._finalize_mutex = threading.Lock()
  89. self._pending = deque()
  90. self._tasks = tasks
  91. if not isinstance(self._tasks, TaskRegistry):
  92. self._tasks = TaskRegistry(self._tasks or {})
  93. # If the class defins a custom __reduce_args__ we need to use
  94. # the old way of pickling apps, which is pickling a list of
  95. # args instead of the new way that pickles a dict of keywords.
  96. self._using_v1_reduce = app_has_custom(self, '__reduce_args__')
  97. # these options are moved to the config to
  98. # simplify pickling of the app object.
  99. self._preconf = changes or {}
  100. if broker:
  101. self._preconf['BROKER_URL'] = broker
  102. if include:
  103. self._preconf['CELERY_IMPORTS'] = include
  104. enable_insecure_serializers()
  105. # Apply fixups.
  106. self.fixups = set(fixups or ())
  107. for fixup in self.fixups | BUILTIN_FIXUPS:
  108. symbol_by_name(fixup)(self)
  109. if self.set_as_current:
  110. self.set_current()
  111. # See Issue #1126
  112. # this is used when pickling the app object so that configuration
  113. # is reread without having to pickle the contents
  114. # (which is often unpickleable anyway)
  115. if self._config_source:
  116. self.config_from_object(self._config_source)
  117. self.on_init()
  118. _register_app(self)
  119. def set_current(self):
  120. _tls.current_app = self
  121. def __enter__(self):
  122. return self
  123. def __exit__(self, *exc_info):
  124. self.close()
  125. def close(self):
  126. self._maybe_close_pool()
  127. def on_init(self):
  128. """Optional callback called at init."""
  129. pass
  130. def start(self, argv=None):
  131. return instantiate(
  132. 'celery.bin.celery:CeleryCommand',
  133. app=self).execute_from_commandline(argv)
  134. def worker_main(self, argv=None):
  135. return instantiate(
  136. 'celery.bin.worker:worker',
  137. app=self).execute_from_commandline(argv)
  138. def task(self, *args, **opts):
  139. """Creates new task class from any callable."""
  140. if _EXECV and not opts.get('_force_evaluate'):
  141. # When using execv the task in the original module will point to a
  142. # different app, so doing things like 'add.request' will point to
  143. # a differnt task instance. This makes sure it will always use
  144. # the task instance from the current app.
  145. # Really need a better solution for this :(
  146. from . import shared_task as proxies_to_curapp
  147. return proxies_to_curapp(*args, _force_evaluate=True, **opts)
  148. def inner_create_task_cls(shared=True, filter=None, **opts):
  149. _filt = filter # stupid 2to3
  150. def _create_task_cls(fun):
  151. if shared:
  152. cons = lambda app: app._task_from_fun(fun, **opts)
  153. cons.__name__ = fun.__name__
  154. shared_task(cons)
  155. if self.accept_magic_kwargs: # compat mode
  156. task = self._task_from_fun(fun, **opts)
  157. if filter:
  158. task = filter(task)
  159. return task
  160. # return a proxy object that is only evaluated when first used
  161. promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
  162. self._pending.append(promise)
  163. if _filt:
  164. return _filt(promise)
  165. return promise
  166. return _create_task_cls
  167. if len(args) == 1 and isinstance(args[0], Callable):
  168. return inner_create_task_cls(**opts)(*args)
  169. if args:
  170. raise TypeError(
  171. 'task() takes no arguments (%s given)' % (len(args, )))
  172. return inner_create_task_cls(**opts)
  173. def _task_from_fun(self, fun, **options):
  174. base = options.pop('base', None) or self.Task
  175. bind = options.pop('bind', False)
  176. T = type(fun.__name__, (base, ), dict({
  177. 'app': self,
  178. 'accept_magic_kwargs': False,
  179. 'run': fun if bind else staticmethod(fun),
  180. '__doc__': fun.__doc__,
  181. '__module__': fun.__module__}, **options))()
  182. task = self._tasks[T.name] # return global instance.
  183. task.bind(self)
  184. return task
  185. def finalize(self):
  186. with self._finalize_mutex:
  187. if not self.finalized:
  188. self.finalized = True
  189. load_shared_tasks(self)
  190. pending = self._pending
  191. while pending:
  192. maybe_evaluate(pending.popleft())
  193. for task in values(self._tasks):
  194. task.bind(self)
  195. def add_defaults(self, fun):
  196. if not isinstance(fun, Callable):
  197. d, fun = fun, lambda: d
  198. if self.configured:
  199. return self.conf.add_defaults(fun())
  200. self._pending_defaults.append(fun)
  201. def config_from_object(self, obj, silent=False):
  202. del(self.conf)
  203. self._config_source = obj
  204. return self.loader.config_from_object(obj, silent=silent)
  205. def config_from_envvar(self, variable_name, silent=False):
  206. module_name = os.environ.get(variable_name)
  207. if not module_name:
  208. if silent:
  209. return False
  210. raise ImproperlyConfigured(ERR_ENVVAR_NOT_SET.format(module_name))
  211. return self.config_from_object(module_name, silent=silent)
  212. def config_from_cmdline(self, argv, namespace='celery'):
  213. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  214. def autodiscover_tasks(self, packages, related_name='tasks'):
  215. if self.conf.CELERY_FORCE_BILLIARD_LOGGING:
  216. # we'll use billiard's processName instead of
  217. # multiprocessing's one in all the loggers
  218. # created after this call
  219. ensure_process_aware_logger()
  220. self.loader.autodiscover_tasks(packages, related_name)
  221. def send_task(self, name, args=None, kwargs=None, countdown=None,
  222. eta=None, task_id=None, producer=None, connection=None,
  223. result_cls=None, expires=None, queues=None, publisher=None,
  224. **options):
  225. producer = producer or publisher # XXX compat
  226. if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  227. warnings.warn(AlwaysEagerIgnored(
  228. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  229. result_cls = result_cls or self.AsyncResult
  230. router = self.amqp.Router(queues)
  231. options.setdefault('compression',
  232. self.conf.CELERY_MESSAGE_COMPRESSION)
  233. options = router.route(options, name, args, kwargs)
  234. with self.producer_or_acquire(producer) as producer:
  235. return result_cls(producer.publish_task(
  236. name, args, kwargs,
  237. task_id=task_id,
  238. countdown=countdown, eta=eta,
  239. expires=expires, **options
  240. ))
  241. def connection(self, hostname=None, userid=None, password=None,
  242. virtual_host=None, port=None, ssl=None,
  243. connect_timeout=None, transport=None,
  244. transport_options=None, heartbeat=None, **kwargs):
  245. conf = self.conf
  246. return self.amqp.Connection(
  247. hostname or conf.BROKER_HOST,
  248. userid or conf.BROKER_USER,
  249. password or conf.BROKER_PASSWORD,
  250. virtual_host or conf.BROKER_VHOST,
  251. port or conf.BROKER_PORT,
  252. transport=transport or conf.BROKER_TRANSPORT,
  253. ssl=self.either('BROKER_USE_SSL', ssl),
  254. connect_timeout=self.either(
  255. 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
  256. heartbeat=heartbeat,
  257. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  258. **transport_options or {}))
  259. broker_connection = connection
  260. @contextmanager
  261. def connection_or_acquire(self, connection=None, pool=True,
  262. *args, **kwargs):
  263. if connection:
  264. yield connection
  265. else:
  266. if pool:
  267. with self.pool.acquire(block=True) as connection:
  268. yield connection
  269. else:
  270. with self.connection() as connection:
  271. yield connection
  272. default_connection = connection_or_acquire # XXX compat
  273. @contextmanager
  274. def producer_or_acquire(self, producer=None):
  275. if producer:
  276. yield producer
  277. else:
  278. with self.amqp.producer_pool.acquire(block=True) as producer:
  279. yield producer
  280. default_producer = producer_or_acquire # XXX compat
  281. def prepare_config(self, c):
  282. """Prepare configuration before it is merged with the defaults."""
  283. return find_deprecated_settings(c)
  284. def now(self):
  285. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  286. def mail_admins(self, subject, body, fail_silently=False):
  287. if self.conf.ADMINS:
  288. to = [admin_email for _, admin_email in self.conf.ADMINS]
  289. return self.loader.mail_admins(
  290. subject, body, fail_silently, to=to,
  291. sender=self.conf.SERVER_EMAIL,
  292. host=self.conf.EMAIL_HOST,
  293. port=self.conf.EMAIL_PORT,
  294. user=self.conf.EMAIL_HOST_USER,
  295. password=self.conf.EMAIL_HOST_PASSWORD,
  296. timeout=self.conf.EMAIL_TIMEOUT,
  297. use_ssl=self.conf.EMAIL_USE_SSL,
  298. use_tls=self.conf.EMAIL_USE_TLS,
  299. )
  300. def select_queues(self, queues=None):
  301. return self.amqp.queues.select_subset(queues)
  302. def either(self, default_key, *values):
  303. """Fallback to the value of a configuration key if none of the
  304. `*values` are true."""
  305. return first(None, values) or self.conf.get(default_key)
  306. def bugreport(self):
  307. return bugreport(self)
  308. def _get_backend(self):
  309. from celery.backends import get_backend_by_url
  310. backend, url = get_backend_by_url(
  311. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  312. self.loader)
  313. return backend(app=self, url=url)
  314. def _get_config(self):
  315. self.configured = True
  316. s = Settings({}, [self.prepare_config(self.loader.conf),
  317. deepcopy(DEFAULTS)])
  318. # load lazy config dict initializers.
  319. pending = self._pending_defaults
  320. while pending:
  321. s.add_defaults(pending.popleft()())
  322. if self._preconf:
  323. for key, value in items(self._preconf):
  324. setattr(s, key, value)
  325. return s
  326. def _after_fork(self, obj_):
  327. self._maybe_close_pool()
  328. def _maybe_close_pool(self):
  329. if self._pool:
  330. self._pool.force_close_all()
  331. self._pool = None
  332. amqp = self.amqp
  333. if amqp._producer_pool:
  334. amqp._producer_pool.force_close_all()
  335. amqp._producer_pool = None
  336. def create_task_cls(self):
  337. """Creates a base task class using default configuration
  338. taken from this app."""
  339. return self.subclass_with_self('celery.app.task:Task', name='Task',
  340. attribute='_app', abstract=True)
  341. def subclass_with_self(self, Class, name=None, attribute='app',
  342. reverse=None, **kw):
  343. """Subclass an app-compatible class by setting its app attribute
  344. to be this app instance.
  345. App-compatible means that the class has a class attribute that
  346. provides the default app it should use, e.g.
  347. ``class Foo: app = None``.
  348. :param Class: The app-compatible class to subclass.
  349. :keyword name: Custom name for the target class.
  350. :keyword attribute: Name of the attribute holding the app,
  351. default is 'app'.
  352. """
  353. Class = symbol_by_name(Class)
  354. reverse = reverse if reverse else Class.__name__
  355. def __reduce__(self):
  356. return _unpickle_appattr, (reverse, self.__reduce_args__())
  357. attrs = dict({attribute: self}, __module__=Class.__module__,
  358. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  359. return type(name or Class.__name__, (Class, ), attrs)
  360. def _rgetattr(self, path):
  361. return attrgetter(path)(self)
  362. def __repr__(self):
  363. return '<{0} {1}>'.format(type(self).__name__, appstr(self))
  364. def __reduce__(self):
  365. if self._using_v1_reduce:
  366. return self.__reduce_v1__()
  367. return (_unpickle_app_v2, (self.__class__, self.__reduce_keys__()))
  368. def __reduce_v1__(self):
  369. # Reduce only pickles the configuration changes,
  370. # so the default configuration doesn't have to be passed
  371. # between processes.
  372. return (
  373. _unpickle_app,
  374. (self.__class__, self.Pickler) + self.__reduce_args__(),
  375. )
  376. def __reduce_keys__(self):
  377. """Returns keyword arguments used to reconstruct the object
  378. when unpickling."""
  379. return {
  380. 'main': self.main,
  381. 'changes': self.conf.changes,
  382. 'loader': self.loader_cls,
  383. 'backend': self.backend_cls,
  384. 'amqp': self.amqp_cls,
  385. 'events': self.events_cls,
  386. 'log': self.log_cls,
  387. 'control': self.control_cls,
  388. 'accept_magic_kwargs': self.accept_magic_kwargs,
  389. 'fixups': self.fixups,
  390. 'config_source': self._config_source,
  391. }
  392. def __reduce_args__(self):
  393. """Deprecated method, please use :meth:`__reduce_keys__` instead."""
  394. return (self.main, self.conf.changes,
  395. self.loader_cls, self.backend_cls, self.amqp_cls,
  396. self.events_cls, self.log_cls, self.control_cls,
  397. self.accept_magic_kwargs, self._config_source)
  398. @cached_property
  399. def Worker(self):
  400. return self.subclass_with_self('celery.apps.worker:Worker')
  401. @cached_property
  402. def WorkController(self, **kwargs):
  403. return self.subclass_with_self('celery.worker:WorkController')
  404. @cached_property
  405. def Beat(self, **kwargs):
  406. return self.subclass_with_self('celery.apps.beat:Beat')
  407. @cached_property
  408. def TaskSet(self):
  409. return self.subclass_with_self('celery.task.sets:TaskSet')
  410. @cached_property
  411. def Task(self):
  412. return self.create_task_cls()
  413. @cached_property
  414. def annotations(self):
  415. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  416. @cached_property
  417. def AsyncResult(self):
  418. return self.subclass_with_self('celery.result:AsyncResult')
  419. @cached_property
  420. def GroupResult(self):
  421. return self.subclass_with_self('celery.result:GroupResult')
  422. @cached_property
  423. def TaskSetResult(self): # XXX compat
  424. return self.subclass_with_self('celery.result:TaskSetResult')
  425. @property
  426. def pool(self):
  427. if self._pool is None:
  428. register_after_fork(self, self._after_fork)
  429. limit = self.conf.BROKER_POOL_LIMIT
  430. self._pool = self.connection().Pool(limit=limit)
  431. return self._pool
  432. @property
  433. def current_task(self):
  434. return _task_stack.top
  435. @cached_property
  436. def oid(self):
  437. return oid_from(self)
  438. @cached_property
  439. def amqp(self):
  440. return instantiate(self.amqp_cls, app=self)
  441. @cached_property
  442. def backend(self):
  443. return self._get_backend()
  444. @cached_property
  445. def conf(self):
  446. return self._get_config()
  447. @cached_property
  448. def control(self):
  449. return instantiate(self.control_cls, app=self)
  450. @cached_property
  451. def events(self):
  452. return instantiate(self.events_cls, app=self)
  453. @cached_property
  454. def loader(self):
  455. return get_loader_cls(self.loader_cls)(app=self)
  456. @cached_property
  457. def log(self):
  458. return instantiate(self.log_cls, app=self)
  459. @cached_property
  460. def tasks(self):
  461. self.finalize()
  462. return self._tasks
  463. App = Celery # compat