base.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Actual App instance implementation.
  6. """
  7. from __future__ import absolute_import
  8. from __future__ import with_statement
  9. import os
  10. import threading
  11. import warnings
  12. from collections import deque
  13. from contextlib import contextmanager
  14. from copy import deepcopy
  15. from functools import wraps
  16. from billiard.util import register_after_fork
  17. from kombu.clocks import LamportClock
  18. from kombu.utils import cached_property
  19. from celery import platforms
  20. from celery.exceptions import AlwaysEagerIgnored, ImproperlyConfigured
  21. from celery.loaders import get_loader_cls
  22. from celery.local import PromiseProxy, maybe_evaluate
  23. from celery._state import _task_stack, _tls, get_current_app, _register_app
  24. from celery.utils.functional import first, maybe_list
  25. from celery.utils.imports import instantiate, symbol_by_name
  26. from .annotations import prepare as prepare_annotations
  27. from .builtins import shared_task, load_shared_tasks
  28. from .defaults import DEFAULTS, find_deprecated_settings
  29. from .registry import TaskRegistry
  30. from .utils import AppPickler, Settings, bugreport, _unpickle_app
  31. _EXECV = os.environ.get('FORKED_BY_MULTIPROCESSING')
  32. def _unpickle_appattr(reverse_name, args):
  33. """Given an attribute name and a list of args, gets
  34. the attribute from the current app and calls it."""
  35. return get_current_app()._rgetattr(reverse_name)(*args)
  36. class Celery(object):
  37. Pickler = AppPickler
  38. SYSTEM = platforms.SYSTEM
  39. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  40. amqp_cls = 'celery.app.amqp:AMQP'
  41. backend_cls = None
  42. events_cls = 'celery.events:Events'
  43. loader_cls = 'celery.loaders.app:AppLoader'
  44. log_cls = 'celery.app.log:Logging'
  45. control_cls = 'celery.app.control:Control'
  46. registry_cls = TaskRegistry
  47. _pool = None
  48. def __init__(self, main=None, loader=None, backend=None,
  49. amqp=None, events=None, log=None, control=None,
  50. set_as_current=True, accept_magic_kwargs=False,
  51. tasks=None, broker=None, include=None, changes=None,
  52. config_source=None,
  53. **kwargs):
  54. self.clock = LamportClock()
  55. self.main = main
  56. self.amqp_cls = amqp or self.amqp_cls
  57. self.backend_cls = backend or self.backend_cls
  58. self.events_cls = events or self.events_cls
  59. self.loader_cls = loader or self.loader_cls
  60. self.log_cls = log or self.log_cls
  61. self.control_cls = control or self.control_cls
  62. self.set_as_current = set_as_current
  63. self.registry_cls = symbol_by_name(self.registry_cls)
  64. self.accept_magic_kwargs = accept_magic_kwargs
  65. self._config_source = config_source
  66. self.configured = False
  67. self._pending_defaults = deque()
  68. self.finalized = False
  69. self._finalize_mutex = threading.Lock()
  70. self._pending = deque()
  71. self._tasks = tasks
  72. if not isinstance(self._tasks, TaskRegistry):
  73. self._tasks = TaskRegistry(self._tasks or {})
  74. # these options are moved to the config to
  75. # simplify pickling of the app object.
  76. self._preconf = changes or {}
  77. if broker:
  78. self._preconf['BROKER_URL'] = broker
  79. if include:
  80. self._preconf['CELERY_IMPORTS'] = include
  81. if self.set_as_current:
  82. self.set_current()
  83. # See Issue #1126
  84. # this is used when pickling the app object so that configuration
  85. # is reread without having to pickle the contents
  86. # (which is often unpickleable anyway)
  87. if self._config_source:
  88. self.config_from_object(self._config_source)
  89. self.on_init()
  90. _register_app(self)
  91. def set_current(self):
  92. _tls.current_app = self
  93. def __enter__(self):
  94. return self
  95. def __exit__(self, *exc_info):
  96. self.close()
  97. def close(self):
  98. self._maybe_close_pool()
  99. def on_init(self):
  100. """Optional callback called at init."""
  101. pass
  102. def start(self, argv=None):
  103. return instantiate(
  104. 'celery.bin.celery:CeleryCommand',
  105. app=self).execute_from_commandline(argv)
  106. def worker_main(self, argv=None):
  107. return instantiate(
  108. 'celery.bin.celeryd:WorkerCommand',
  109. app=self).execute_from_commandline(argv)
  110. def task(self, *args, **opts):
  111. """Creates new task class from any callable."""
  112. if _EXECV and not opts.get('_force_evaluate'):
  113. # When using execv the task in the original module will point to a
  114. # different app, so doing things like 'add.request' will point to
  115. # a differnt task instance. This makes sure it will always use
  116. # the task instance from the current app.
  117. # Really need a better solution for this :(
  118. from . import shared_task as proxies_to_curapp
  119. opts['_force_evaluate'] = True # XXX Py2.5
  120. return proxies_to_curapp(*args, **opts)
  121. def inner_create_task_cls(shared=True, filter=None, **opts):
  122. _filt = filter # stupid 2to3
  123. def _create_task_cls(fun):
  124. if shared:
  125. cons = lambda app: app._task_from_fun(fun, **opts)
  126. cons.__name__ = fun.__name__
  127. shared_task(cons)
  128. if self.accept_magic_kwargs: # compat mode
  129. task = self._task_from_fun(fun, **opts)
  130. if filter:
  131. task = filter(task)
  132. return task
  133. # return a proxy object that is only evaluated when first used
  134. promise = PromiseProxy(self._task_from_fun, (fun, ), opts)
  135. self._pending.append(promise)
  136. if _filt:
  137. return _filt(promise)
  138. return promise
  139. return _create_task_cls
  140. if len(args) == 1 and callable(args[0]):
  141. return inner_create_task_cls(**opts)(*args)
  142. if args:
  143. raise TypeError(
  144. 'task() takes no arguments (%s given)' % (len(args, )))
  145. return inner_create_task_cls(**opts)
  146. def _task_from_fun(self, fun, **options):
  147. base = options.pop('base', None) or self.Task
  148. T = type(fun.__name__, (base, ), dict({
  149. 'app': self,
  150. 'accept_magic_kwargs': False,
  151. 'run': staticmethod(fun),
  152. '__doc__': fun.__doc__,
  153. '__module__': fun.__module__}, **options))()
  154. task = self._tasks[T.name] # return global instance.
  155. task.bind(self)
  156. return task
  157. def finalize(self):
  158. with self._finalize_mutex:
  159. if not self.finalized:
  160. self.finalized = True
  161. load_shared_tasks(self)
  162. pending = self._pending
  163. while pending:
  164. maybe_evaluate(pending.popleft())
  165. for task in self._tasks.itervalues():
  166. task.bind(self)
  167. def add_defaults(self, fun):
  168. if not callable(fun):
  169. d, fun = fun, lambda: d
  170. if self.configured:
  171. return self.conf.add_defaults(fun())
  172. self._pending_defaults.append(fun)
  173. def config_from_object(self, obj, silent=False):
  174. del(self.conf)
  175. self._config_source = obj
  176. return self.loader.config_from_object(obj, silent=silent)
  177. def config_from_envvar(self, variable_name, silent=False):
  178. module_name = os.environ.get(variable_name)
  179. if not module_name:
  180. if silent:
  181. return False
  182. raise ImproperlyConfigured(self.error_envvar_not_set % module_name)
  183. return self.config_from_object(module_name, silent=silent)
  184. def config_from_cmdline(self, argv, namespace='celery'):
  185. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  186. def send_task(self, name, args=None, kwargs=None, countdown=None,
  187. eta=None, task_id=None, producer=None, connection=None,
  188. result_cls=None, expires=None, queues=None, publisher=None,
  189. link=None, link_error=None,
  190. **options):
  191. producer = producer or publisher # XXX compat
  192. if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  193. warnings.warn(AlwaysEagerIgnored(
  194. 'CELERY_ALWAYS_EAGER has no effect on send_task'))
  195. result_cls = result_cls or self.AsyncResult
  196. router = self.amqp.Router(queues)
  197. options.setdefault('compression',
  198. self.conf.CELERY_MESSAGE_COMPRESSION)
  199. options = router.route(options, name, args, kwargs)
  200. with self.producer_or_acquire(producer) as producer:
  201. return result_cls(producer.publish_task(
  202. name, args, kwargs,
  203. task_id=task_id,
  204. countdown=countdown, eta=eta,
  205. callbacks=maybe_list(link),
  206. errbacks=maybe_list(link_error),
  207. expires=expires, **options
  208. ))
  209. def connection(self, hostname=None, userid=None,
  210. password=None, virtual_host=None, port=None, ssl=None,
  211. insist=None, connect_timeout=None, transport=None,
  212. transport_options=None, heartbeat=None, **kwargs):
  213. conf = self.conf
  214. return self.amqp.Connection(
  215. hostname or conf.BROKER_HOST,
  216. userid or conf.BROKER_USER,
  217. password or conf.BROKER_PASSWORD,
  218. virtual_host or conf.BROKER_VHOST,
  219. port or conf.BROKER_PORT,
  220. transport=transport or conf.BROKER_TRANSPORT,
  221. insist=self.either('BROKER_INSIST', insist),
  222. ssl=self.either('BROKER_USE_SSL', ssl),
  223. connect_timeout=self.either(
  224. 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
  225. heartbeat=heartbeat,
  226. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  227. **transport_options or {}))
  228. broker_connection = connection
  229. @contextmanager
  230. def connection_or_acquire(self, connection=None, pool=True,
  231. *args, **kwargs):
  232. if connection:
  233. yield connection
  234. else:
  235. if pool:
  236. with self.pool.acquire(block=True) as connection:
  237. yield connection
  238. else:
  239. with self.connection() as connection:
  240. yield connection
  241. default_connection = connection_or_acquire # XXX compat
  242. @contextmanager
  243. def producer_or_acquire(self, producer=None):
  244. if producer:
  245. yield producer
  246. else:
  247. with self.amqp.producer_pool.acquire(block=True) as producer:
  248. yield producer
  249. default_producer = producer_or_acquire # XXX compat
  250. def with_default_connection(self, fun):
  251. """With any function accepting a `connection`
  252. keyword argument, establishes a default connection if one is
  253. not already passed to it.
  254. Any automatically established connection will be closed after
  255. the function returns.
  256. **Deprecated**
  257. Use ``with app.connection_or_acquire(connection)`` instead.
  258. """
  259. @wraps(fun)
  260. def _inner(*args, **kwargs):
  261. connection = kwargs.pop('connection', None)
  262. with self.connection_or_acquire(connection) as c:
  263. return fun(*args, **dict(kwargs, connection=c))
  264. return _inner
  265. def prepare_config(self, c):
  266. """Prepare configuration before it is merged with the defaults."""
  267. return find_deprecated_settings(c)
  268. def now(self):
  269. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  270. def mail_admins(self, subject, body, fail_silently=False):
  271. if self.conf.ADMINS:
  272. to = [admin_email for _, admin_email in self.conf.ADMINS]
  273. return self.loader.mail_admins(
  274. subject, body, fail_silently, to=to,
  275. sender=self.conf.SERVER_EMAIL,
  276. host=self.conf.EMAIL_HOST,
  277. port=self.conf.EMAIL_PORT,
  278. user=self.conf.EMAIL_HOST_USER,
  279. password=self.conf.EMAIL_HOST_PASSWORD,
  280. timeout=self.conf.EMAIL_TIMEOUT,
  281. use_ssl=self.conf.EMAIL_USE_SSL,
  282. use_tls=self.conf.EMAIL_USE_TLS,
  283. )
  284. def select_queues(self, queues=None):
  285. return self.amqp.queues.select_subset(queues)
  286. def either(self, default_key, *values):
  287. """Fallback to the value of a configuration key if none of the
  288. `*values` are true."""
  289. return first(None, values) or self.conf.get(default_key)
  290. def bugreport(self):
  291. return bugreport(self)
  292. def _get_backend(self):
  293. from celery.backends import get_backend_by_url
  294. backend, url = get_backend_by_url(
  295. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  296. self.loader)
  297. return backend(app=self, url=url)
  298. def _get_config(self):
  299. self.configured = True
  300. s = Settings({}, [self.prepare_config(self.loader.conf),
  301. deepcopy(DEFAULTS)])
  302. # load lazy config dict initializers.
  303. pending = self._pending_defaults
  304. while pending:
  305. s.add_defaults(pending.popleft()())
  306. if self._preconf:
  307. for key, value in self._preconf.iteritems():
  308. setattr(s, key, value)
  309. return s
  310. def _after_fork(self, obj_):
  311. self._maybe_close_pool()
  312. def _maybe_close_pool(self):
  313. if self._pool:
  314. self._pool.force_close_all()
  315. self._pool = None
  316. amqp = self.amqp
  317. if amqp._producer_pool:
  318. amqp._producer_pool.force_close_all()
  319. amqp._producer_pool = None
  320. def create_task_cls(self):
  321. """Creates a base task class using default configuration
  322. taken from this app."""
  323. return self.subclass_with_self('celery.app.task:Task', name='Task',
  324. attribute='_app', abstract=True)
  325. def subclass_with_self(self, Class, name=None, attribute='app',
  326. reverse=None, **kw):
  327. """Subclass an app-compatible class by setting its app attribute
  328. to be this app instance.
  329. App-compatible means that the class has a class attribute that
  330. provides the default app it should use, e.g.
  331. ``class Foo: app = None``.
  332. :param Class: The app-compatible class to subclass.
  333. :keyword name: Custom name for the target class.
  334. :keyword attribute: Name of the attribute holding the app,
  335. default is 'app'.
  336. """
  337. Class = symbol_by_name(Class)
  338. reverse = reverse if reverse else Class.__name__
  339. def __reduce__(self):
  340. return _unpickle_appattr, (reverse, self.__reduce_args__())
  341. attrs = dict({attribute: self}, __module__=Class.__module__,
  342. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  343. return type(name or Class.__name__, (Class, ), attrs)
  344. def _rgetattr(self, path):
  345. return reduce(getattr, [self] + path.split('.'))
  346. def __repr__(self):
  347. return '<%s %s:0x%x>' % (self.__class__.__name__,
  348. self.main or '__main__', id(self), )
  349. def __reduce__(self):
  350. # Reduce only pickles the configuration changes,
  351. # so the default configuration doesn't have to be passed
  352. # between processes.
  353. return (
  354. _unpickle_app,
  355. (self.__class__, self.Pickler) + self.__reduce_args__(),
  356. )
  357. def __reduce_args__(self):
  358. return (self.main, self.conf.changes, self.loader_cls,
  359. self.backend_cls, self.amqp_cls, self.events_cls,
  360. self.log_cls, self.control_cls, self.accept_magic_kwargs,
  361. self._config_source)
  362. @cached_property
  363. def Worker(self):
  364. return self.subclass_with_self('celery.apps.worker:Worker')
  365. @cached_property
  366. def WorkController(self, **kwargs):
  367. return self.subclass_with_self('celery.worker:WorkController')
  368. @cached_property
  369. def Beat(self, **kwargs):
  370. return self.subclass_with_self('celery.apps.beat:Beat')
  371. @cached_property
  372. def TaskSet(self):
  373. return self.subclass_with_self('celery.task.sets:TaskSet')
  374. @cached_property
  375. def Task(self):
  376. return self.create_task_cls()
  377. @cached_property
  378. def annotations(self):
  379. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  380. @cached_property
  381. def AsyncResult(self):
  382. return self.subclass_with_self('celery.result:AsyncResult')
  383. @cached_property
  384. def GroupResult(self):
  385. return self.subclass_with_self('celery.result:GroupResult')
  386. @cached_property
  387. def TaskSetResult(self): # XXX compat
  388. return self.subclass_with_self('celery.result:TaskSetResult')
  389. @property
  390. def pool(self):
  391. if self._pool is None:
  392. register_after_fork(self, self._after_fork)
  393. limit = self.conf.BROKER_POOL_LIMIT
  394. self._pool = self.connection().Pool(limit=limit)
  395. return self._pool
  396. @property
  397. def current_task(self):
  398. return _task_stack.top
  399. @cached_property
  400. def amqp(self):
  401. return instantiate(self.amqp_cls, app=self)
  402. @cached_property
  403. def backend(self):
  404. return self._get_backend()
  405. @cached_property
  406. def conf(self):
  407. return self._get_config()
  408. @cached_property
  409. def control(self):
  410. return instantiate(self.control_cls, app=self)
  411. @cached_property
  412. def events(self):
  413. return instantiate(self.events_cls, app=self)
  414. @cached_property
  415. def loader(self):
  416. return get_loader_cls(self.loader_cls)(app=self)
  417. @cached_property
  418. def log(self):
  419. return instantiate(self.log_cls, app=self)
  420. @cached_property
  421. def tasks(self):
  422. self.finalize()
  423. return self._tasks
  424. App = Celery # compat