base.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Application Base Class.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import warnings
  12. from collections import deque
  13. from contextlib import contextmanager
  14. from copy import deepcopy
  15. from functools import wraps
  16. from kombu.clocks import LamportClock
  17. from celery import platforms
  18. from celery.exceptions import AlwaysEagerIgnored
  19. from celery.loaders import get_loader_cls
  20. from celery.local import PromiseProxy, maybe_evaluate
  21. from celery.utils import cached_property, register_after_fork
  22. from celery.utils.functional import first
  23. from celery.utils.imports import instantiate, symbol_by_name
  24. from .annotations import prepare as prepare_annotations
  25. from .builtins import load_builtin_tasks
  26. from .defaults import DEFAULTS, find_deprecated_settings
  27. from .state import _tls, get_current_app
  28. from .utils import AppPickler, Settings, bugreport, _unpickle_app
  29. def _unpickle_appattr(reverse_name, args):
  30. """Given an attribute name and a list of args, gets
  31. the attribute from the current app and calls it."""
  32. return getattr(get_current_app(), reverse_name)(*args)
  33. class Celery(object):
  34. Pickler = AppPickler
  35. SYSTEM = platforms.SYSTEM
  36. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  37. amqp_cls = "celery.app.amqp:AMQP"
  38. backend_cls = None
  39. events_cls = "celery.events:Events"
  40. loader_cls = "celery.loaders.app:AppLoader"
  41. log_cls = "celery.app.log:Logging"
  42. control_cls = "celery.app.control:Control"
  43. registry_cls = "celery.app.registry:TaskRegistry"
  44. _pool = None
  45. def __init__(self, main=None, loader=None, backend=None,
  46. amqp=None, events=None, log=None, control=None,
  47. set_as_current=True, accept_magic_kwargs=False,
  48. tasks=None, broker=None, **kwargs):
  49. self.clock = LamportClock()
  50. self.main = main
  51. self.amqp_cls = amqp or self.amqp_cls
  52. self.backend_cls = backend or self.backend_cls
  53. self.events_cls = events or self.events_cls
  54. self.loader_cls = loader or self.loader_cls
  55. self.log_cls = log or self.log_cls
  56. self.control_cls = control or self.control_cls
  57. self.set_as_current = set_as_current
  58. self.registry_cls = self.registry_cls if tasks is None else tasks
  59. self.accept_magic_kwargs = accept_magic_kwargs
  60. self.finalized = False
  61. self._pending = deque()
  62. self._tasks = instantiate(self.registry_cls)
  63. # these options are moved to the config to
  64. # simplify pickling of the app object.
  65. self._preconf = {}
  66. if broker:
  67. self._preconf["BROKER_URL"] = broker
  68. if self.set_as_current:
  69. self.set_current()
  70. self.on_init()
  71. def set_current(self):
  72. _tls.current_app = self
  73. def on_init(self):
  74. """Optional callback called at init."""
  75. pass
  76. def start(self, argv=None):
  77. return instantiate("celery.bin.celery:CeleryCommand", app=self) \
  78. .execute_from_commandline(argv)
  79. def worker_main(self, argv=None):
  80. return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
  81. .execute_from_commandline(argv)
  82. def task(self, *args, **options):
  83. """Creates new task class from any callable."""
  84. def inner_create_task_cls(**options):
  85. def _create_task_cls(fun):
  86. if self.accept_magic_kwargs: # compat mode
  87. return self._task_from_fun(fun, **options)
  88. # return a proxy object that is only evaluated when first used
  89. promise = PromiseProxy(self._task_from_fun, (fun, ), options)
  90. self._pending.append(promise)
  91. return promise
  92. return _create_task_cls
  93. if len(args) == 1 and callable(args[0]):
  94. return inner_create_task_cls(**options)(*args)
  95. return inner_create_task_cls(**options)
  96. def _task_from_fun(self, fun, **options):
  97. base = options.pop("base", None) or self.Task
  98. T = type(fun.__name__, (base, ), dict({
  99. "app": self,
  100. "accept_magic_kwargs": False,
  101. "run": staticmethod(fun),
  102. "__doc__": fun.__doc__,
  103. "__module__": fun.__module__}, **options))()
  104. task = self._tasks[T.name] # return global instance.
  105. task.bind(self)
  106. return task
  107. def finalize(self):
  108. if not self.finalized:
  109. load_builtin_tasks(self)
  110. pending = self._pending
  111. while pending:
  112. maybe_evaluate(pending.pop())
  113. self.finalized = True
  114. def config_from_object(self, obj, silent=False):
  115. del(self.conf)
  116. return self.loader.config_from_object(obj, silent=silent)
  117. def config_from_envvar(self, variable_name, silent=False):
  118. del(self.conf)
  119. return self.loader.config_from_envvar(variable_name, silent=silent)
  120. def config_from_cmdline(self, argv, namespace="celery"):
  121. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  122. def send_task(self, name, args=None, kwargs=None, countdown=None,
  123. eta=None, task_id=None, publisher=None, connection=None,
  124. connect_timeout=None, result_cls=None, expires=None,
  125. queues=None, **options):
  126. if self.conf.CELERY_ALWAYS_EAGER:
  127. warnings.warn(AlwaysEagerIgnored(
  128. "CELERY_ALWAYS_EAGER has no effect on send_task"))
  129. router = self.amqp.Router(queues)
  130. result_cls = result_cls or self.AsyncResult
  131. options.setdefault("compression",
  132. self.conf.CELERY_MESSAGE_COMPRESSION)
  133. options = router.route(options, name, args, kwargs)
  134. exchange = options.get("exchange")
  135. exchange_type = options.get("exchange_type")
  136. with self.default_connection(connection, connect_timeout) as conn:
  137. publish = publisher or self.amqp.TaskPublisher(conn,
  138. exchange=exchange,
  139. exchange_type=exchange_type)
  140. try:
  141. new_id = publish.delay_task(name, args, kwargs,
  142. task_id=task_id,
  143. countdown=countdown, eta=eta,
  144. expires=expires, **options)
  145. finally:
  146. publisher or publish.close()
  147. return result_cls(new_id)
  148. def broker_connection(self, hostname=None, userid=None,
  149. password=None, virtual_host=None, port=None, ssl=None,
  150. insist=None, connect_timeout=None, transport=None,
  151. transport_options=None, **kwargs):
  152. conf = self.conf
  153. return self.amqp.BrokerConnection(
  154. hostname or conf.BROKER_HOST,
  155. userid or conf.BROKER_USER,
  156. password or conf.BROKER_PASSWORD,
  157. virtual_host or conf.BROKER_VHOST,
  158. port or conf.BROKER_PORT,
  159. transport=transport or conf.BROKER_TRANSPORT,
  160. insist=self.either("BROKER_INSIST", insist),
  161. ssl=self.either("BROKER_USE_SSL", ssl),
  162. connect_timeout=self.either(
  163. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  164. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  165. **transport_options or {}))
  166. @contextmanager
  167. def default_connection(self, connection=None, connect_timeout=None):
  168. if connection:
  169. yield connection
  170. else:
  171. with self.pool.acquire(block=True) as connection:
  172. yield connection
  173. def with_default_connection(self, fun):
  174. """With any function accepting `connection` and `connect_timeout`
  175. keyword arguments, establishes a default connection if one is
  176. not already passed to it.
  177. Any automatically established connection will be closed after
  178. the function returns.
  179. **Deprecated**
  180. Use ``with app.default_connection(connection)`` instead.
  181. """
  182. @wraps(fun)
  183. def _inner(*args, **kwargs):
  184. connection = kwargs.pop("connection", None)
  185. with self.default_connection(connection) as c:
  186. return fun(*args, **dict(kwargs, connection=c))
  187. return _inner
  188. def prepare_config(self, c):
  189. """Prepare configuration before it is merged with the defaults."""
  190. if self._preconf:
  191. for key, value in self._preconf.iteritems():
  192. setattr(c, key, value)
  193. return find_deprecated_settings(c)
  194. def now(self):
  195. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  196. def mail_admins(self, subject, body, fail_silently=False):
  197. if self.conf.ADMINS:
  198. to = [admin_email for _, admin_email in self.conf.ADMINS]
  199. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  200. sender=self.conf.SERVER_EMAIL,
  201. host=self.conf.EMAIL_HOST,
  202. port=self.conf.EMAIL_PORT,
  203. user=self.conf.EMAIL_HOST_USER,
  204. password=self.conf.EMAIL_HOST_PASSWORD,
  205. timeout=self.conf.EMAIL_TIMEOUT,
  206. use_ssl=self.conf.EMAIL_USE_SSL,
  207. use_tls=self.conf.EMAIL_USE_TLS)
  208. def select_queues(self, queues=None):
  209. return self.amqp.queues.select_subset(queues,
  210. self.conf.CELERY_CREATE_MISSING_QUEUES)
  211. def either(self, default_key, *values):
  212. """Fallback to the value of a configuration key if none of the
  213. `*values` are true."""
  214. return first(None, values) or self.conf.get(default_key)
  215. def bugreport(self):
  216. return bugreport(self)
  217. def _get_backend(self):
  218. from celery.backends import get_backend_by_url
  219. backend, url = get_backend_by_url(
  220. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  221. self.loader)
  222. return backend(app=self, url=url)
  223. def _get_config(self):
  224. return Settings({}, [self.prepare_config(self.loader.conf),
  225. deepcopy(DEFAULTS)])
  226. def _after_fork(self, obj_):
  227. if self._pool:
  228. self._pool.force_close_all()
  229. self._pool = None
  230. def create_task_cls(self):
  231. """Creates a base task class using default configuration
  232. taken from this app."""
  233. return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
  234. attribute="_app", abstract=True)
  235. def subclass_with_self(self, Class, name=None, attribute="app",
  236. reverse=None, **kw):
  237. """Subclass an app-compatible class by setting its app attribute
  238. to be this app instance.
  239. App-compatible means that the class has a class attribute that
  240. provides the default app it should use, e.g.
  241. ``class Foo: app = None``.
  242. :param Class: The app-compatible class to subclass.
  243. :keyword name: Custom name for the target class.
  244. :keyword attribute: Name of the attribute holding the app,
  245. default is "app".
  246. """
  247. Class = symbol_by_name(Class)
  248. reverse = reverse if reverse else Class.__name__
  249. def __reduce__(self):
  250. return _unpickle_appattr, (reverse, self.__reduce_args__())
  251. attrs = dict({attribute: self}, __module__=Class.__module__,
  252. __doc__=Class.__doc__, __reduce__=__reduce__, **kw)
  253. return type(name or Class.__name__, (Class, ), attrs)
  254. def __repr__(self):
  255. return "<%s %s:0x%x>" % (self.__class__.__name__,
  256. self.main or "__main__", id(self), )
  257. def __reduce__(self):
  258. # Reduce only pickles the configuration changes,
  259. # so the default configuration doesn't have to be passed
  260. # between processes.
  261. return (_unpickle_app, (self.__class__, self.Pickler)
  262. + self.__reduce_args__())
  263. def __reduce_args__(self):
  264. return (self.main, self.conf.changes, self.loader_cls,
  265. self.backend_cls, self.amqp_cls, self.events_cls,
  266. self.log_cls, self.control_cls, self.accept_magic_kwargs)
  267. @cached_property
  268. def Worker(self):
  269. """Create new :class:`~celery.apps.worker.Worker` instance."""
  270. return self.subclass_with_self("celery.apps.worker:Worker")
  271. @cached_property
  272. def WorkController(self, **kwargs):
  273. return self.subclass_with_self("celery.worker:WorkController")
  274. @cached_property
  275. def Beat(self, **kwargs):
  276. """Create new :class:`~celery.apps.beat.Beat` instance."""
  277. return self.subclass_with_self("celery.apps.beat:Beat")
  278. @cached_property
  279. def TaskSet(self):
  280. return self.subclass_with_self("celery.task.sets:TaskSet")
  281. @cached_property
  282. def Task(self):
  283. """Default Task base class for this application."""
  284. return self.create_task_cls()
  285. @cached_property
  286. def annotations(self):
  287. return prepare_annotations(self.conf.CELERY_ANNOTATIONS)
  288. @cached_property
  289. def AsyncResult(self):
  290. return self.subclass_with_self("celery.result:AsyncResult")
  291. @cached_property
  292. def TaskSetResult(self):
  293. return self.subclass_with_self("celery.result:TaskSetResult")
  294. @property
  295. def pool(self):
  296. if self._pool is None:
  297. register_after_fork(self, self._after_fork)
  298. self._pool = self.broker_connection().Pool(
  299. limit=self.conf.BROKER_POOL_LIMIT)
  300. return self._pool
  301. @cached_property
  302. def amqp(self):
  303. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  304. return instantiate(self.amqp_cls, app=self)
  305. @cached_property
  306. def backend(self):
  307. """Storing/retrieving task state. See
  308. :class:`~celery.backend.base.BaseBackend`."""
  309. return self._get_backend()
  310. @cached_property
  311. def conf(self):
  312. """Current configuration (dict and attribute access)."""
  313. return self._get_config()
  314. @cached_property
  315. def control(self):
  316. """Controlling worker nodes. See
  317. :class:`~celery.app.control.Control`."""
  318. return instantiate(self.control_cls, app=self)
  319. @cached_property
  320. def events(self):
  321. """Sending/receiving events. See :class:`~celery.events.Events`. """
  322. return instantiate(self.events_cls, app=self)
  323. @cached_property
  324. def loader(self):
  325. """Current loader."""
  326. return get_loader_cls(self.loader_cls)(app=self)
  327. @cached_property
  328. def log(self):
  329. """Logging utilities. See :class:`~celery.app.log.Logging`."""
  330. return instantiate(self.log_cls, app=self)
  331. @cached_property
  332. def tasks(self):
  333. """Registry of available tasks.
  334. Accessing this attribute will also finalize the app.
  335. """
  336. self.finalize()
  337. return self._tasks
  338. App = Celery # compat