base.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Application Base Class.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import os
  12. import warnings
  13. import platform as _platform
  14. from contextlib import contextmanager
  15. from copy import deepcopy
  16. from functools import wraps
  17. from kombu.clocks import LamportClock
  18. from .. import datastructures
  19. from .. import platforms
  20. from ..exceptions import AlwaysEagerIgnored
  21. from ..utils import cached_property, instantiate, lpmerge
  22. from .defaults import DEFAULTS, find_deprecated_settings, find
  23. import kombu
  24. if kombu.VERSION < (2, 0):
  25. raise ImportError("Celery requires Kombu version 1.1.0 or higher.")
  26. BUGREPORT_INFO = """
  27. platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
  28. software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
  29. settings -> transport:%(transport)s results:%(results)s
  30. """
  31. class Settings(datastructures.ConfigurationView):
  32. @property
  33. def CELERY_RESULT_BACKEND(self):
  34. """Resolves deprecated alias ``CELERY_BACKEND``."""
  35. return self.get("CELERY_RESULT_BACKEND") or self.get("CELERY_BACKEND")
  36. @property
  37. def BROKER_TRANSPORT(self):
  38. """Resolves compat aliases :setting:`BROKER_BACKEND`
  39. and :setting:`CARROT_BACKEND`."""
  40. return (self.get("BROKER_TRANSPORT") or
  41. self.get("BROKER_BACKEND") or
  42. self.get("CARROT_BACKEND"))
  43. @property
  44. def BROKER_BACKEND(self):
  45. """Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
  46. return self.BROKER_TRANSPORT
  47. @property
  48. def BROKER_HOST(self):
  49. return (os.environ.get("CELERY_BROKER_URL") or
  50. self.get("BROKER_URL") or
  51. self.get("BROKER_HOST"))
  52. def find_option(self, name, namespace="celery"):
  53. return find(name, namespace)
  54. def get_by_parts(self, *parts):
  55. return self["_".join(filter(None, parts))]
  56. def find_value_for_key(self, name, namespace="celery"):
  57. ns, key, _ = self.find_option(name, namespace=namespace)
  58. return self.get_by_parts(ns, key)
  59. class BaseApp(object):
  60. """Base class for apps."""
  61. SYSTEM = platforms.SYSTEM
  62. IS_OSX = platforms.IS_OSX
  63. IS_WINDOWS = platforms.IS_WINDOWS
  64. amqp_cls = "celery.app.amqp:AMQP"
  65. backend_cls = None
  66. events_cls = "celery.events:Events"
  67. loader_cls = "celery.loaders.app:AppLoader"
  68. log_cls = "celery.log:Logging"
  69. control_cls = "celery.task.control:Control"
  70. _pool = None
  71. def __init__(self, main=None, loader=None, backend=None,
  72. amqp=None, events=None, log=None, control=None,
  73. set_as_current=True, accept_magic_kwargs=False, **kwargs):
  74. self.main = main
  75. self.amqp_cls = amqp or self.amqp_cls
  76. self.backend_cls = backend or self.backend_cls
  77. self.events_cls = events or self.events_cls
  78. self.loader_cls = loader or self.loader_cls
  79. self.log_cls = log or self.log_cls
  80. self.control_cls = control or self.control_cls
  81. self.set_as_current = set_as_current
  82. self.accept_magic_kwargs = accept_magic_kwargs
  83. self.clock = LamportClock()
  84. self.on_init()
  85. def on_init(self):
  86. """Called at the end of the constructor."""
  87. pass
  88. def config_from_object(self, obj, silent=False):
  89. """Read configuration from object, where object is either
  90. a object, or the name of a module to import.
  91. >>> celery.config_from_object("myapp.celeryconfig")
  92. >>> from myapp import celeryconfig
  93. >>> celery.config_from_object(celeryconfig)
  94. """
  95. del(self.conf)
  96. return self.loader.config_from_object(obj, silent=silent)
  97. def config_from_envvar(self, variable_name, silent=False):
  98. """Read configuration from environment variable.
  99. The value of the environment variable must be the name
  100. of a module to import.
  101. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  102. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  103. """
  104. del(self.conf)
  105. return self.loader.config_from_envvar(variable_name, silent=silent)
  106. def config_from_cmdline(self, argv, namespace="celery"):
  107. """Read configuration from argv.
  108. The config
  109. """
  110. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  111. def send_task(self, name, args=None, kwargs=None, countdown=None,
  112. eta=None, task_id=None, publisher=None, connection=None,
  113. connect_timeout=None, result_cls=None, expires=None,
  114. queues=None, **options):
  115. """Send task by name.
  116. :param name: Name of task to execute (e.g. `"tasks.add"`).
  117. :keyword result_cls: Specify custom result class. Default is
  118. using :meth:`AsyncResult`.
  119. Supports the same arguments as
  120. :meth:`~celery.app.task.BaseTask.apply_async`.
  121. """
  122. if self.conf.CELERY_ALWAYS_EAGER:
  123. warnings.warn(AlwaysEagerIgnored(
  124. "CELERY_ALWAYS_EAGER has no effect on send_task"))
  125. router = self.amqp.Router(queues)
  126. result_cls = result_cls or self.AsyncResult
  127. options.setdefault("compression",
  128. self.conf.CELERY_MESSAGE_COMPRESSION)
  129. options = router.route(options, name, args, kwargs)
  130. exchange = options.get("exchange")
  131. exchange_type = options.get("exchange_type")
  132. with self.default_connection(connection, connect_timeout) as conn:
  133. publish = publisher or self.amqp.TaskPublisher(conn,
  134. exchange=exchange,
  135. exchange_type=exchange_type)
  136. try:
  137. new_id = publish.delay_task(name, args, kwargs,
  138. task_id=task_id,
  139. countdown=countdown, eta=eta,
  140. expires=expires, **options)
  141. finally:
  142. publisher or publish.close()
  143. return result_cls(new_id)
  144. def AsyncResult(self, task_id, backend=None, task_name=None):
  145. """Create :class:`celery.result.BaseAsyncResult` instance."""
  146. from ..result import BaseAsyncResult
  147. return BaseAsyncResult(task_id, app=self, task_name=task_name,
  148. backend=backend or self.backend)
  149. def TaskSetResult(self, taskset_id, results, **kwargs):
  150. """Create :class:`celery.result.TaskSetResult` instance."""
  151. from ..result import TaskSetResult
  152. return TaskSetResult(taskset_id, results, app=self)
  153. def broker_connection(self, hostname=None, userid=None,
  154. password=None, virtual_host=None, port=None, ssl=None,
  155. insist=None, connect_timeout=None, transport=None,
  156. transport_options=None, **kwargs):
  157. """Establish a connection to the message broker.
  158. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  159. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  160. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  161. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  162. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  163. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  164. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  165. :keyword connect_timeout: defaults to the
  166. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  167. :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
  168. setting.
  169. :returns :class:`kombu.connection.BrokerConnection`:
  170. """
  171. conf = self.conf
  172. return self.amqp.BrokerConnection(
  173. hostname or conf.BROKER_HOST,
  174. userid or conf.BROKER_USER,
  175. password or conf.BROKER_PASSWORD,
  176. virtual_host or conf.BROKER_VHOST,
  177. port or conf.BROKER_PORT,
  178. transport=transport or conf.BROKER_TRANSPORT,
  179. insist=self.either("BROKER_INSIST", insist),
  180. ssl=self.either("BROKER_USE_SSL", ssl),
  181. connect_timeout=self.either(
  182. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  183. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  184. **transport_options or {}))
  185. @contextmanager
  186. def default_connection(self, connection=None, connect_timeout=None):
  187. """For use within a with-statement to get a connection from the pool
  188. if one is not already provided.
  189. :keyword connection: If not provided, then a connection will be
  190. acquired from the connection pool.
  191. :keyword connect_timeout: *No longer used.*
  192. """
  193. if connection:
  194. yield connection
  195. else:
  196. with self.pool.acquire(block=True) as connection:
  197. yield connection
  198. def with_default_connection(self, fun):
  199. """With any function accepting `connection` and `connect_timeout`
  200. keyword arguments, establishes a default connection if one is
  201. not already passed to it.
  202. Any automatically established connection will be closed after
  203. the function returns.
  204. **Deprecated**
  205. Use ``with app.default_connection(connection)`` instead.
  206. """
  207. @wraps(fun)
  208. def _inner(*args, **kwargs):
  209. connection = kwargs.pop("connection", None)
  210. with self.default_connection(connection) as c:
  211. return fun(*args, **dict(kwargs, connection=c))
  212. return _inner
  213. def prepare_config(self, c):
  214. """Prepare configuration before it is merged with the defaults."""
  215. find_deprecated_settings(c)
  216. return c
  217. def now(self):
  218. return self.loader.now(utc=self.conf.CELERY_ENABLE_UTC)
  219. def mail_admins(self, subject, body, fail_silently=False):
  220. """Send an email to the admins in the :setting:`ADMINS` setting."""
  221. if self.conf.ADMINS:
  222. to = [admin_email for _, admin_email in self.conf.ADMINS]
  223. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  224. sender=self.conf.SERVER_EMAIL,
  225. host=self.conf.EMAIL_HOST,
  226. port=self.conf.EMAIL_PORT,
  227. user=self.conf.EMAIL_HOST_USER,
  228. password=self.conf.EMAIL_HOST_PASSWORD,
  229. timeout=self.conf.EMAIL_TIMEOUT,
  230. use_ssl=self.conf.EMAIL_USE_SSL,
  231. use_tls=self.conf.EMAIL_USE_TLS)
  232. def select_queues(self, queues=None):
  233. if queues:
  234. return self.amqp.queues.select_subset(queues,
  235. self.conf.CELERY_CREATE_MISSING_QUEUES)
  236. def either(self, default_key, *values):
  237. """Fallback to the value of a configuration key if none of the
  238. `*values` are true."""
  239. for value in values:
  240. if value is not None:
  241. return value
  242. return self.conf.get(default_key)
  243. def merge(self, l, r):
  244. """Like `dict(a, **b)` except it will keep values from `a`
  245. if the value in `b` is :const:`None`."""
  246. return lpmerge(l, r)
  247. def _get_backend(self):
  248. from ..backends import get_backend_cls
  249. return get_backend_cls(
  250. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  251. loader=self.loader)(app=self)
  252. def _get_config(self):
  253. return Settings({}, [self.prepare_config(self.loader.conf),
  254. deepcopy(DEFAULTS)])
  255. def _after_fork(self, obj_):
  256. if self._pool:
  257. self._pool.force_close_all()
  258. self._pool = None
  259. def bugreport(self):
  260. import celery
  261. import kombu
  262. return BUGREPORT_INFO % {"system": _platform.system(),
  263. "arch": _platform.architecture(),
  264. "py_i": platforms.pyimplementation(),
  265. "celery_v": celery.__version__,
  266. "kombu_v": kombu.__version__,
  267. "py_v": _platform.python_version(),
  268. "transport": self.conf.BROKER_TRANSPORT,
  269. "results": self.conf.CELERY_RESULT_BACKEND}
  270. @property
  271. def pool(self):
  272. if self._pool is None:
  273. try:
  274. from multiprocessing.util import register_after_fork
  275. register_after_fork(self, self._after_fork)
  276. except ImportError:
  277. pass
  278. self._pool = self.broker_connection().Pool(
  279. limit=self.conf.BROKER_POOL_LIMIT)
  280. return self._pool
  281. @cached_property
  282. def amqp(self):
  283. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  284. return instantiate(self.amqp_cls, app=self)
  285. @cached_property
  286. def backend(self):
  287. """Storing/retrieving task state. See
  288. :class:`~celery.backend.base.BaseBackend`."""
  289. return self._get_backend()
  290. @cached_property
  291. def conf(self):
  292. """Current configuration (dict and attribute access)."""
  293. return self._get_config()
  294. @cached_property
  295. def control(self):
  296. """Controlling worker nodes. See
  297. :class:`~celery.task.control.Control`."""
  298. return instantiate(self.control_cls, app=self)
  299. @cached_property
  300. def events(self):
  301. """Sending/receiving events. See :class:`~celery.events.Events`. """
  302. return instantiate(self.events_cls, app=self)
  303. @cached_property
  304. def loader(self):
  305. """Current loader."""
  306. from ..loaders import get_loader_cls
  307. return get_loader_cls(self.loader_cls)(app=self)
  308. @cached_property
  309. def log(self):
  310. """Logging utilities. See :class:`~celery.log.Logging`."""
  311. return instantiate(self.log_cls, app=self)
  312. @cached_property
  313. def tasks(self):
  314. from ..registry import tasks
  315. return tasks