base.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Application Base Class.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import warnings
  12. from collections import deque
  13. from contextlib import contextmanager
  14. from copy import deepcopy
  15. from functools import wraps
  16. from billiard.util import register_after_fork
  17. from kombu.clocks import LamportClock
  18. from kombu.utils import cached_property
  19. from celery import platforms
  20. from celery.exceptions import AlwaysEagerIgnored
  21. from celery.loaders import get_loader_cls
  22. from celery.local import PromiseProxy, maybe_evaluate
  23. from celery.utils.functional import first
  24. from celery.utils.imports import instantiate, symbol_by_name
  25. from .annotations import prepare as prepare_annotations
  26. from .builtins import load_builtin_tasks
  27. from .defaults import DEFAULTS, find_deprecated_settings
  28. from .state import _tls, get_current_app
  29. from .utils import AppPickler, Settings, bugreport, _unpickle_app
  30. def _unpickle_appattr(reverse_name, args):
  31. """Given an attribute name and a list of args, gets
  32. the attribute from the current app and calls it."""
  33. return get_current_app()._rgetattr(reverse_name)(*args)
  34. class Celery(object):
  35. Pickler = AppPickler
  36. SYSTEM = platforms.SYSTEM
  37. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  38. amqp_cls = "celery.app.amqp:AMQP"
  39. backend_cls = None
  40. events_cls = "celery.events:Events"
  41. loader_cls = "celery.loaders.app:AppLoader"
  42. log_cls = "celery.app.log:Logging"
  43. control_cls = "celery.app.control:Control"
  44. registry_cls = "celery.app.registry:TaskRegistry"
  45. _pool = None
  46. def __init__(self, main=None, loader=None, backend=None,
  47. amqp=None, events=None, log=None, control=None,
  48. set_as_current=True, accept_magic_kwargs=False,
  49. tasks=None, broker=None, **kwargs):
  50. self.clock = LamportClock()
  51. self.main = main
  52. self.amqp_cls = amqp or self.amqp_cls
  53. self.backend_cls = backend or self.backend_cls
  54. self.events_cls = events or self.events_cls
  55. self.loader_cls = loader or self.loader_cls
  56. self.log_cls = log or self.log_cls
  57. self.control_cls = control or self.control_cls
  58. self.set_as_current = set_as_current
  59. self.registry_cls = self.registry_cls if tasks is None else tasks
  60. self.accept_magic_kwargs = accept_magic_kwargs
  61. self.finalized = False
  62. self._pending = deque()
  63. self._tasks = instantiate(self.registry_cls)
  64. # these options are moved to the config to
  65. # simplify pickling of the app object.
  66. self._preconf = {}
  67. if broker:
  68. self._preconf["BROKER_URL"] = broker
  69. if self.set_as_current:
  70. self.set_current()
  71. self.on_init()
  72. def set_current(self):
  73. _tls.current_app = self
  74. def on_init(self):
  75. """Optional callback called at init."""
  76. pass
  77. def start(self, argv=None):
  78. return instantiate("celery.bin.celery:CeleryCommand", app=self) \
  79. .execute_from_commandline(argv)
  80. def worker_main(self, argv=None):
  81. return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
  82. .execute_from_commandline(argv)
  83. def task(self, *args, **options):
  84. """Creates new task class from any callable."""
  85. def inner_create_task_cls(**options):
  86. def _create_task_cls(fun):
  87. if self.accept_magic_kwargs: # compat mode
  88. return self._task_from_fun(fun, **options)
  89. # return a proxy object that is only evaluated when first used
  90. promise = PromiseProxy(self._task_from_fun, (fun, ), options)
  91. self._pending.append(promise)
  92. return promise
  93. return _create_task_cls
  94. if len(args) == 1 and callable(args[0]):
  95. return inner_create_task_cls(**options)(*args)
  96. return inner_create_task_cls(**options)
  97. def _task_from_fun(self, fun, **options):
  98. base = options.pop("base", None) or self.Task
  99. T = type(fun.__name__, (base, ), dict({
  100. "app": self,
  101. "accept_magic_kwargs": False,
  102. "run": staticmethod(fun),
  103. "__doc__": fun.__doc__,
  104. "__module__": fun.__module__}, **options))()
  105. task = self._tasks[T.name] # return global instance.
  106. task.bind(self)
  107. return task
  108. def finalize(self):
  109. if not self.finalized:
  110. load_builtin_tasks(self)
  111. pending = self._pending
  112. while pending:
  113. maybe_evaluate(pending.pop())
  114. self.finalized = True
  115. def config_from_object(self, obj, silent=False):
  116. del(self.conf)
  117. return self.loader.config_from_object(obj, silent=silent)
  118. def config_from_envvar(self, variable_name, silent=False):
  119. del(self.conf)
  120. return self.loader.config_from_envvar(variable_name, silent=silent)
  121. def config_from_cmdline(self, argv, namespace="celery"):
  122. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  123. def send_task(self, name, args=None, kwargs=None, countdown=None,
  124. eta=None, task_id=None, publisher=None, connection=None,
  125. result_cls=None, expires=None, queues=None, **options):
  126. if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
  127. warnings.warn(AlwaysEagerIgnored(
  128. "CELERY_ALWAYS_EAGER has no effect on send_task"))
  129. result_cls = result_cls or self.AsyncResult
  130. router = self.amqp.Router(queues)
  131. options.setdefault("compression",
  132. self.conf.CELERY_MESSAGE_COMPRESSION)
  133. options = router.route(options, name, args, kwargs)
  134. with self.default_producer(publisher) as producer:
  135. return result_cls(producer.delay_task(name, args, kwargs,
  136. task_id=task_id,
  137. countdown=countdown, eta=eta,
  138. expires=expires, **options))
  139. def broker_connection(self, hostname=None, userid=None,
  140. password=None, virtual_host=None, port=None, ssl=None,
  141. insist=None, connect_timeout=None, transport=None,
  142. transport_options=None, **kwargs):
  143. conf = self.conf
  144. return self.amqp.BrokerConnection(
  145. hostname or conf.BROKER_HOST,
  146. userid or conf.BROKER_USER,
  147. password or conf.BROKER_PASSWORD,
  148. virtual_host or conf.BROKER_VHOST,
  149. port or conf.BROKER_PORT,
  150. transport=transport or conf.BROKER_TRANSPORT,
  151. insist=self.either("BROKER_INSIST", insist),
  152. ssl=self.either("BROKER_USE_SSL", ssl),
  153. connect_timeout=self.either(
  154. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  155. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  156. **transport_options or {}))
  157. @contextmanager
  158. def default_connection(self, connection=None, *args, **kwargs):
  159. if connection:
  160. yield connection
  161. else:
  162. with self.pool.acquire(block=True) as connection:
  163. yield connection
  164. @contextmanager
  165. def default_producer(self, producer=None):
  166. if producer:
  167. yield producer
  168. else:
  169. with self.amqp.publisher_pool.acquire(block=True) as producer:
  170. yield producer
  171. def with_default_connection(self, fun):
  172. """With any function accepting a `connection`
  173. keyword argument, establishes a default connection if one is
  174. not already passed to it.
  175. Any automatically established connection will be closed after
  176. the function returns.
  177. **Deprecated**
  178. Use ``with app.default_connection(connection)`` instead.
  179. """
  180. @wraps(fun)
  181. def _inner(*args, **kwargs):
  182. connection = kwargs.pop("connection", None)
  183. with self.default_connection(connection) as c:
  184. return fun(*args, **dict(kwargs, connection=c))
  185. return _inner
  186. def prepare_config(self, c):
  187. """Prepare configuration before it is merged with the defaults."""
  188. return find_deprecated_settings(c)
  189. def now(self):
  190. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  191. def mail_admins(self, subject, body, fail_silently=False):
  192. if self.conf.ADMINS:
  193. to = [admin_email for _, admin_email in self.conf.ADMINS]
  194. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  195. sender=self.conf.SERVER_EMAIL,
  196. host=self.conf.EMAIL_HOST,
  197. port=self.conf.EMAIL_PORT,
  198. user=self.conf.EMAIL_HOST_USER,
  199. password=self.conf.EMAIL_HOST_PASSWORD,
  200. timeout=self.conf.EMAIL_TIMEOUT,
  201. use_ssl=self.conf.EMAIL_USE_SSL,
  202. use_tls=self.conf.EMAIL_USE_TLS)
  203. def select_queues(self, queues=None):
  204. return self.amqp.queues.select_subset(queues)
  205. def either(self, default_key, *values):
  206. """Fallback to the value of a configuration key if none of the
  207. `*values` are true."""
  208. return first(None, values) or self.conf.get(default_key)
  209. def bugreport(self):
  210. return bugreport(self)
  211. def _get_backend(self):
  212. from celery.backends import get_backend_by_url
  213. backend, url = get_backend_by_url(
  214. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  215. self.loader)
  216. return backend(app=self, url=url)
  217. def _get_config(self):
  218. s = Settings({}, [self.prepare_config(self.loader.conf),
  219. deepcopy(DEFAULTS)])
  220. if self._preconf:
  221. for key, value in self._preconf.iteritems():
  222. setattr(s, key, value)
  223. return s
  224. def _after_fork(self, obj_):
  225. if self._pool:
  226. self._pool.force_close_all()
  227. self._pool = None
  228. def create_task_cls(self):
  229. """Creates a base task class using default configuration
  230. taken from this app."""
  231. return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
  232. attribute="_app", abstract=True)
  233. def subclass_with_self(self, Class, name=None, attribute="app",
  234. reverse=None, **kw):
  235. """Subclass an app-compatible class by setting its app attribute
  236. to be this app instance.
  237. App-compatible means that the class has a class attribute that
  238. provides the default app it should use, e.g.
  239. ``class Foo: app = None``.
  240. :param Class: The app-compatible class to subclass.
  241. :keyword name: Custom name for the target class.
  242. :keyword attribute: Name of the attribute holding the app,
  243. default is "app".
  244. """
  245. Class = symbol_by_name(Class)
  246. reverse = reverse if reverse else Class.__name__
  247. def __reduce__(self):
  248. return _unpickle_appattr, (reverse, self.__reduce_args__())
  249. attrs = dict({attribute: self}, __module__=Class.__module__,
  250. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  251. return type(name or Class.__name__, (Class, ), attrs)
  252. def _rgetattr(self, path):
  253. return reduce(getattr, [self] + path.split('.'))
  254. def __repr__(self):
  255. return "<%s %s:0x%x>" % (self.__class__.__name__,
  256. self.main or "__main__", id(self), )
  257. def __reduce__(self):
  258. # Reduce only pickles the configuration changes,
  259. # so the default configuration doesn't have to be passed
  260. # between processes.
  261. return (_unpickle_app, (self.__class__, self.Pickler)
  262. + self.__reduce_args__())
  263. def __reduce_args__(self):
  264. return (self.main, self.conf.changes, self.loader_cls,
  265. self.backend_cls, self.amqp_cls, self.events_cls,
  266. self.log_cls, self.control_cls, self.accept_magic_kwargs)
  267. @cached_property
  268. def Worker(self):
  269. """Create new :class:`~celery.apps.worker.Worker` instance."""
  270. return self.subclass_with_self("celery.apps.worker:Worker")
  271. @cached_property
  272. def WorkController(self, **kwargs):
  273. return self.subclass_with_self("celery.worker:WorkController")
  274. @cached_property
  275. def Beat(self, **kwargs):
  276. """Create new :class:`~celery.apps.beat.Beat` instance."""
  277. return self.subclass_with_self("celery.apps.beat:Beat")
  278. @cached_property
  279. def TaskSet(self):
  280. return self.subclass_with_self("celery.task.sets:TaskSet")
  281. @cached_property
  282. def Task(self):
  283. """Default Task base class for this application."""
  284. return self.create_task_cls()
  285. @cached_property
  286. def annotations(self):
  287. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  288. @cached_property
  289. def AsyncResult(self):
  290. return self.subclass_with_self("celery.result:AsyncResult")
  291. @cached_property
  292. def TaskSetResult(self):
  293. return self.subclass_with_self("celery.result:TaskSetResult")
  294. @property
  295. def pool(self):
  296. if self._pool is None:
  297. register_after_fork(self, self._after_fork)
  298. self._pool = self.broker_connection().Pool(
  299. limit=self.conf.BROKER_POOL_LIMIT)
  300. return self._pool
  301. @cached_property
  302. def amqp(self):
  303. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  304. return instantiate(self.amqp_cls, app=self)
  305. @cached_property
  306. def backend(self):
  307. """Storing/retrieving task state. See
  308. :class:`~celery.backend.base.BaseBackend`."""
  309. return self._get_backend()
  310. @cached_property
  311. def conf(self):
  312. """Current configuration (dict and attribute access)."""
  313. return self._get_config()
  314. @cached_property
  315. def control(self):
  316. """Controlling worker nodes. See
  317. :class:`~celery.app.control.Control`."""
  318. return instantiate(self.control_cls, app=self)
  319. @cached_property
  320. def events(self):
  321. """Sending/receiving events. See :class:`~celery.events.Events`. """
  322. return instantiate(self.events_cls, app=self)
  323. @cached_property
  324. def loader(self):
  325. """Current loader."""
  326. return get_loader_cls(self.loader_cls)(app=self)
  327. @cached_property
  328. def log(self):
  329. """Logging utilities. See :class:`~celery.app.log.Logging`."""
  330. return instantiate(self.log_cls, app=self)
  331. @cached_property
  332. def tasks(self):
  333. """Registry of available tasks.
  334. Accessing this attribute will also finalize the app.
  335. """
  336. self.finalize()
  337. return self._tasks
  338. App = Celery # compat