base.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. import threading
  9. import warnings
  10. from collections import defaultdict, deque
  11. from contextlib import contextmanager
  12. from copy import deepcopy
  13. from functools import wraps
  14. from operator import attrgetter
  15. from billiard.util import register_after_fork
  16. from kombu.clocks import LamportClock
  17. from kombu.utils import cached_property
  18. from celery import platforms
  19. from celery.exceptions import AlwaysEagerIgnored
  20. from celery.loaders import get_loader_cls
  21. from celery.local import PromiseProxy, maybe_evaluate
  22. from celery._state import _task_stack, _tls, get_current_app, _register_app
  23. from celery.utils.functional import first
  24. from celery.utils.imports import instantiate, symbol_by_name
  25. from .annotations import prepare as prepare_annotations
  26. from .builtins import shared_task, load_shared_tasks
  27. from .defaults import DEFAULTS, find_deprecated_settings
  28. from .registry import TaskRegistry
  29. from .utils import AppPickler, Settings, bugreport, _unpickle_app
  30. DEFAULT_FIXUPS = (
  31. 'celery.fixups.django:DjangoFixup',
  32. )
  33. def _unpickle_appattr(reverse_name, args):
  34. """Given an attribute name and a list of args, gets
  35. the attribute from the current app and calls it."""
  36. return get_current_app()._rgetattr(reverse_name)(*args)
  37. class Celery(object):
  38. Pickler = AppPickler
  39. SYSTEM = platforms.SYSTEM
  40. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  41. amqp_cls = 'celery.app.amqp:AMQP'
  42. backend_cls = None
  43. events_cls = 'celery.events:Events'
  44. loader_cls = 'celery.loaders.app:AppLoader'
  45. log_cls = 'celery.app.log:Logging'
  46. control_cls = 'celery.app.control:Control'
  47. registry_cls = TaskRegistry
  48. _pool = None
  49. def __init__(self, main=None, loader=None, backend=None,
  50. amqp=None, events=None, log=None, control=None,
  51. set_as_current=True, accept_magic_kwargs=False,
  52. tasks=None, broker=None, include=None, **kwargs):
  53. self.clock = LamportClock()
  54. self.main = main
  55. self.amqp_cls = amqp or self.amqp_cls
  56. self.backend_cls = backend or self.backend_cls
  57. self.events_cls = events or self.events_cls
  58. self.loader_cls = loader or self.loader_cls
  59. self.log_cls = log or self.log_cls
  60. self.control_cls = control or self.control_cls
  61. self.set_as_current = set_as_current
  62. self.registry_cls = symbol_by_name(self.registry_cls)
  63. self.accept_magic_kwargs = accept_magic_kwargs
  64. self.user_options = defaultdict(set)
  65. self.steps = defaultdict(set)
  66. self.configured = False
  67. self._pending_defaults = deque()
  68. self.finalized = False
  69. self._finalize_mutex = threading.Lock()
  70. self._pending = deque()
  71. self._tasks = tasks
  72. if not isinstance(self._tasks, TaskRegistry):
  73. self._tasks = TaskRegistry(self._tasks or {})
  74. # these options are moved to the config to
  75. # simplify pickling of the app object.
  76. self._preconf = {}
  77. if broker:
  78. self._preconf['BROKER_URL'] = broker
  79. if include:
  80. self._preconf['CELERY_IMPORTS'] = include
  81. self.fixups = list(filter(None, (symbol_by_name(f).include(self)
  82. for f in DEFAULT_FIXUPS)))
  83. if self.set_as_current:
  84. self.set_current()
  85. self.on_init()
  86. _register_app(self)
  87. def set_current(self):
  88. _tls.current_app = self
  89. def __enter__(self):
  90. return self
  91. def __exit__(self, *exc_info):
  92. self.close()
  93. def close(self):
  94. self._maybe_close_pool()
  95. def on_init(self):
  96. """Optional callback called at init."""
  97. pass
  98. def start(self, argv=None):
  99. return instantiate('celery.bin.celery:CeleryCommand', app=self) \
  100. .execute_from_commandline(argv)
  101. def worker_main(self, argv=None):
  102. return instantiate('celery.bin.celeryd:WorkerCommand', app=self) \
  103. .execute_from_commandline(argv)
  104. def task(self, *args, **opts):
  105. """Creates new task class from any callable."""
  106. def inner_create_task_cls(shared=True, filter=None, **opts):
  107. def _create_task_cls(fun):
  108. if shared:
  109. cons = lambda app: app._task_from_fun(fun, **opts)
  110. cons.__name__ = fun.__name__
  111. shared_task(cons)
  112. if self.accept_magic_kwargs: # compat mode
  113. task = self._task_from_fun(fun, **opts)
  114. if filter:
  115. task = filter(task)
  116. return task
  117. # return a proxy object that is only evaluated when first used
  118. promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
  119. self._pending.append(promise)
  120. if filter:
  121. return filter(promise)
  122. return promise
  123. return _create_task_cls
  124. if len(args) == 1 and callable(args[0]):
  125. return inner_create_task_cls(**opts)(*args)
  126. return inner_create_task_cls(**opts)
  127. def _task_from_fun(self, fun, **options):
  128. base = options.pop('base', None) or self.Task
  129. T = type(fun.__name__, (base, ), dict({
  130. 'app': self,
  131. 'accept_magic_kwargs': False,
  132. 'run': staticmethod(fun),
  133. '__doc__': fun.__doc__,
  134. '__module__': fun.__module__}, **options))()
  135. task = self._tasks[T.name] # return global instance.
  136. task.bind(self)
  137. return task
  138. def finalize(self):
  139. with self._finalize_mutex:
  140. if not self.finalized:
  141. self.finalized = True
  142. load_shared_tasks(self)
  143. pending = self._pending
  144. while pending:
  145. maybe_evaluate(pending.popleft())
  146. for task in self._tasks.itervalues():
  147. task.bind(self)
  148. def add_defaults(self, fun):
  149. if not callable(fun):
  150. d, fun = fun, lambda: d
  151. if self.configured:
  152. return self.conf.add_defaults(fun())
  153. self._pending_defaults.append(fun)
  154. def config_from_object(self, obj, silent=False):
  155. del(self.conf)
  156. return self.loader.config_from_object(obj, silent=silent)
  157. def config_from_envvar(self, variable_name, silent=False):
  158. del(self.conf)
  159. return self.loader.config_from_envvar(variable_name, silent=silent)
  160. def config_from_cmdline(self, argv, namespace='celery'):
  161. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  162. def autodiscover_tasks(self, packages, related_name='tasks'):
  163. self.loader.autodiscover_tasks(packages, related_name)
  164. def send_task(self, name, args=None, kwargs=None, countdown=None,
  165. eta=None, task_id=None, producer=None, connection=None,
  166. result_cls=None, expires=None, queues=None, publisher=None,
  167. **options):
  168. producer = producer or publisher # XXX compat
  169. if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  170. warnings.warn(AlwaysEagerIgnored(
  171. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  172. result_cls = result_cls or self.AsyncResult
  173. router = self.amqp.Router(queues)
  174. options.setdefault('compression',
  175. self.conf.CELERY_MESSAGE_COMPRESSION)
  176. options = router.route(options, name, args, kwargs)
  177. with self.producer_or_acquire(producer) as producer:
  178. return result_cls(producer.publish_task(name, args, kwargs,
  179. task_id=task_id,
  180. countdown=countdown, eta=eta,
  181. expires=expires, **options))
  182. def connection(self, hostname=None, userid=None, password=None,
  183. virtual_host=None, port=None, ssl=None, connect_timeout=None,
  184. transport=None, transport_options=None, heartbeat=None, **kwargs):
  185. conf = self.conf
  186. return self.amqp.Connection(
  187. hostname or conf.BROKER_HOST,
  188. userid or conf.BROKER_USER,
  189. password or conf.BROKER_PASSWORD,
  190. virtual_host or conf.BROKER_VHOST,
  191. port or conf.BROKER_PORT,
  192. transport=transport or conf.BROKER_TRANSPORT,
  193. ssl=self.either('BROKER_USE_SSL', ssl),
  194. connect_timeout=self.either(
  195. 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
  196. heartbeat=heartbeat,
  197. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  198. **transport_options or {}))
  199. broker_connection = connection
  200. @contextmanager
  201. def connection_or_acquire(self, connection=None, pool=True,
  202. *args, **kwargs):
  203. if connection:
  204. yield connection
  205. else:
  206. if pool:
  207. with self.pool.acquire(block=True) as connection:
  208. yield connection
  209. else:
  210. with self.connection() as connection:
  211. yield connection
  212. default_connection = connection_or_acquire # XXX compat
  213. @contextmanager
  214. def producer_or_acquire(self, producer=None):
  215. if producer:
  216. yield producer
  217. else:
  218. with self.amqp.producer_pool.acquire(block=True) as producer:
  219. yield producer
  220. default_producer = producer_or_acquire # XXX compat
  221. def with_default_connection(self, fun):
  222. """With any function accepting a `connection`
  223. keyword argument, establishes a default connection if one is
  224. not already passed to it.
  225. Any automatically established connection will be closed after
  226. the function returns.
  227. **Deprecated**
  228. Use ``with app.connection_or_acquire(connection)`` instead.
  229. """
  230. @wraps(fun)
  231. def _inner(*args, **kwargs):
  232. connection = kwargs.pop('connection', None)
  233. with self.connection_or_acquire(connection) as c:
  234. return fun(*args, connection=c, **kwargs)
  235. return _inner
  236. def prepare_config(self, c):
  237. """Prepare configuration before it is merged with the defaults."""
  238. return find_deprecated_settings(c)
  239. def now(self):
  240. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  241. def mail_admins(self, subject, body, fail_silently=False):
  242. if self.conf.ADMINS:
  243. to = [admin_email for _, admin_email in self.conf.ADMINS]
  244. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  245. sender=self.conf.SERVER_EMAIL,
  246. host=self.conf.EMAIL_HOST,
  247. port=self.conf.EMAIL_PORT,
  248. user=self.conf.EMAIL_HOST_USER,
  249. password=self.conf.EMAIL_HOST_PASSWORD,
  250. timeout=self.conf.EMAIL_TIMEOUT,
  251. use_ssl=self.conf.EMAIL_USE_SSL,
  252. use_tls=self.conf.EMAIL_USE_TLS)
  253. def select_queues(self, queues=None):
  254. return self.amqp.queues.select_subset(queues)
  255. def either(self, default_key, *values):
  256. """Fallback to the value of a configuration key if none of the
  257. `*values` are true."""
  258. return first(None, values) or self.conf.get(default_key)
  259. def bugreport(self):
  260. return bugreport(self)
  261. def _get_backend(self):
  262. from celery.backends import get_backend_by_url
  263. backend, url = get_backend_by_url(
  264. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  265. self.loader)
  266. return backend(app=self, url=url)
  267. def _get_config(self):
  268. self.configured = True
  269. s = Settings({}, [self.prepare_config(self.loader.conf),
  270. deepcopy(DEFAULTS)])
  271. # load lazy config dict initializers.
  272. pending = self._pending_defaults
  273. while pending:
  274. s.add_defaults(pending.popleft()())
  275. if self._preconf:
  276. for key, value in self._preconf.iteritems():
  277. setattr(s, key, value)
  278. return s
  279. def _after_fork(self, obj_):
  280. self._maybe_close_pool()
  281. def _maybe_close_pool(self):
  282. if self._pool:
  283. self._pool.force_close_all()
  284. self._pool = None
  285. def create_task_cls(self):
  286. """Creates a base task class using default configuration
  287. taken from this app."""
  288. return self.subclass_with_self('celery.app.task:Task', name='Task',
  289. attribute='_app', abstract=True)
  290. def subclass_with_self(self, Class, name=None, attribute='app',
  291. reverse=None, **kw):
  292. """Subclass an app-compatible class by setting its app attribute
  293. to be this app instance.
  294. App-compatible means that the class has a class attribute that
  295. provides the default app it should use, e.g.
  296. ``class Foo: app = None``.
  297. :param Class: The app-compatible class to subclass.
  298. :keyword name: Custom name for the target class.
  299. :keyword attribute: Name of the attribute holding the app,
  300. default is 'app'.
  301. """
  302. Class = symbol_by_name(Class)
  303. reverse = reverse if reverse else Class.__name__
  304. def __reduce__(self):
  305. return _unpickle_appattr, (reverse, self.__reduce_args__())
  306. attrs = dict({attribute: self}, __module__=Class.__module__,
  307. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  308. return type(name or Class.__name__, (Class, ), attrs)
  309. def _rgetattr(self, path):
  310. return attrgetter(path)(self)
  311. def __repr__(self):
  312. return '<{0} {1}:0x{2:x}>'.format(
  313. type(self).__name__, self.main or '__main__', id(self))
  314. def __reduce__(self):
  315. # Reduce only pickles the configuration changes,
  316. # so the default configuration doesn't have to be passed
  317. # between processes.
  318. return (_unpickle_app, (self.__class__, self.Pickler)
  319. + self.__reduce_args__())
  320. def __reduce_args__(self):
  321. return (self.main, self.conf.changes, self.loader_cls,
  322. self.backend_cls, self.amqp_cls, self.events_cls,
  323. self.log_cls, self.control_cls, self.accept_magic_kwargs)
  324. @cached_property
  325. def Worker(self):
  326. return self.subclass_with_self('celery.apps.worker:Worker')
  327. @cached_property
  328. def WorkController(self, **kwargs):
  329. return self.subclass_with_self('celery.worker:WorkController')
  330. @cached_property
  331. def Beat(self, **kwargs):
  332. return self.subclass_with_self('celery.apps.beat:Beat')
  333. @cached_property
  334. def TaskSet(self):
  335. return self.subclass_with_self('celery.task.sets:TaskSet')
  336. @cached_property
  337. def Task(self):
  338. return self.create_task_cls()
  339. @cached_property
  340. def annotations(self):
  341. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  342. @cached_property
  343. def AsyncResult(self):
  344. return self.subclass_with_self('celery.result:AsyncResult')
  345. @cached_property
  346. def GroupResult(self):
  347. return self.subclass_with_self('celery.result:GroupResult')
  348. @cached_property
  349. def TaskSetResult(self): # XXX compat
  350. return self.subclass_with_self('celery.result:TaskSetResult')
  351. @property
  352. def pool(self):
  353. if self._pool is None:
  354. register_after_fork(self, self._after_fork)
  355. limit = self.conf.BROKER_POOL_LIMIT
  356. self._pool = self.connection().Pool(limit=limit)
  357. return self._pool
  358. @property
  359. def current_task(self):
  360. return _task_stack.top
  361. @cached_property
  362. def amqp(self):
  363. return instantiate(self.amqp_cls, app=self)
  364. @cached_property
  365. def backend(self):
  366. return self._get_backend()
  367. @cached_property
  368. def conf(self):
  369. return self._get_config()
  370. @cached_property
  371. def control(self):
  372. return instantiate(self.control_cls, app=self)
  373. @cached_property
  374. def events(self):
  375. return instantiate(self.events_cls, app=self)
  376. @cached_property
  377. def loader(self):
  378. return get_loader_cls(self.loader_cls)(app=self)
  379. @cached_property
  380. def log(self):
  381. return instantiate(self.log_cls, app=self)
  382. @cached_property
  383. def tasks(self):
  384. self.finalize()
  385. return self._tasks
  386. App = Celery # compat