base.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Application Base Class.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import os
  12. import platform as _platform
  13. from contextlib import contextmanager
  14. from copy import deepcopy
  15. from functools import wraps
  16. from kombu.clocks import LamportClock
  17. from .. import datastructures
  18. from .. import platforms
  19. from ..utils import cached_property, instantiate, lpmerge
  20. from .defaults import DEFAULTS, find_deprecated_settings
  21. import kombu
  22. if kombu.VERSION < (1, 1, 0):
  23. raise ImportError("Celery requires Kombu version 1.1.0 or higher.")
  24. BUGREPORT_INFO = """
  25. platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
  26. software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
  27. settings -> transport:%(transport)s results:%(results)s
  28. """
  29. class Settings(datastructures.ConfigurationView):
  30. @property
  31. def CELERY_RESULT_BACKEND(self):
  32. """Resolves deprecated alias ``CELERY_BACKEND``."""
  33. return self.get("CELERY_RESULT_BACKEND") or self.get("CELERY_BACKEND")
  34. @property
  35. def BROKER_TRANSPORT(self):
  36. """Resolves compat aliases :setting:`BROKER_BACKEND`
  37. and :setting:`CARROT_BACKEND`."""
  38. return (self.get("BROKER_TRANSPORT") or
  39. self.get("BROKER_BACKEND") or
  40. self.get("CARROT_BACKEND"))
  41. @property
  42. def BROKER_BACKEND(self):
  43. """Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
  44. return self.BROKER_TRANSPORT
  45. @property
  46. def BROKER_HOST(self):
  47. return (os.environ.get("CELERY_BROKER_URL") or
  48. self.get("BROKER_URL") or
  49. self.get("BROKER_HOST"))
  50. class BaseApp(object):
  51. """Base class for apps."""
  52. SYSTEM = platforms.SYSTEM
  53. IS_OSX = platforms.IS_OSX
  54. IS_WINDOWS = platforms.IS_WINDOWS
  55. amqp_cls = "celery.app.amqp.AMQP"
  56. backend_cls = None
  57. events_cls = "celery.events.Events"
  58. loader_cls = "celery.loaders.app.AppLoader"
  59. log_cls = "celery.log.Logging"
  60. control_cls = "celery.task.control.Control"
  61. _pool = None
  62. def __init__(self, main=None, loader=None, backend=None,
  63. amqp=None, events=None, log=None, control=None,
  64. set_as_current=True, accept_magic_kwargs=False, **kwargs):
  65. self.main = main
  66. self.amqp_cls = amqp or self.amqp_cls
  67. self.backend_cls = backend or self.backend_cls
  68. self.events_cls = events or self.events_cls
  69. self.loader_cls = loader or self.loader_cls
  70. self.log_cls = log or self.log_cls
  71. self.control_cls = control or self.control_cls
  72. self.set_as_current = set_as_current
  73. self.accept_magic_kwargs = accept_magic_kwargs
  74. self.clock = LamportClock()
  75. self.on_init()
  76. def on_init(self):
  77. """Called at the end of the constructor."""
  78. pass
  79. def config_from_object(self, obj, silent=False):
  80. """Read configuration from object, where object is either
  81. a object, or the name of a module to import.
  82. >>> celery.config_from_object("myapp.celeryconfig")
  83. >>> from myapp import celeryconfig
  84. >>> celery.config_from_object(celeryconfig)
  85. """
  86. del(self.conf)
  87. return self.loader.config_from_object(obj, silent=silent)
  88. def config_from_envvar(self, variable_name, silent=False):
  89. """Read configuration from environment variable.
  90. The value of the environment variable must be the name
  91. of a module to import.
  92. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  93. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  94. """
  95. del(self.conf)
  96. return self.loader.config_from_envvar(variable_name, silent=silent)
  97. def config_from_cmdline(self, argv, namespace="celery"):
  98. """Read configuration from argv.
  99. The config
  100. """
  101. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  102. def send_task(self, name, args=None, kwargs=None, countdown=None,
  103. eta=None, task_id=None, publisher=None, connection=None,
  104. connect_timeout=None, result_cls=None, expires=None,
  105. queues=None, **options):
  106. """Send task by name.
  107. :param name: Name of task to execute (e.g. `"tasks.add"`).
  108. :keyword result_cls: Specify custom result class. Default is
  109. using :meth:`AsyncResult`.
  110. Supports the same arguments as
  111. :meth:`~celery.app.task.BaseTask.apply_async`.
  112. """
  113. router = self.amqp.Router(queues)
  114. result_cls = result_cls or self.AsyncResult
  115. options.setdefault("compression",
  116. self.conf.CELERY_MESSAGE_COMPRESSION)
  117. options = router.route(options, name, args, kwargs)
  118. exchange = options.get("exchange")
  119. exchange_type = options.get("exchange_type")
  120. with self.default_connection(connection, connect_timeout) as conn:
  121. publish = publisher or self.amqp.TaskPublisher(conn,
  122. exchange=exchange,
  123. exchange_type=exchange_type)
  124. try:
  125. new_id = publish.delay_task(name, args, kwargs,
  126. task_id=task_id,
  127. countdown=countdown, eta=eta,
  128. expires=expires, **options)
  129. finally:
  130. publisher or publish.close()
  131. return result_cls(new_id)
  132. def AsyncResult(self, task_id, backend=None, task_name=None):
  133. """Create :class:`celery.result.BaseAsyncResult` instance."""
  134. from ..result import BaseAsyncResult
  135. return BaseAsyncResult(task_id, app=self, task_name=task_name,
  136. backend=backend or self.backend)
  137. def TaskSetResult(self, taskset_id, results, **kwargs):
  138. """Create :class:`celery.result.TaskSetResult` instance."""
  139. from ..result import TaskSetResult
  140. return TaskSetResult(taskset_id, results, app=self)
  141. def broker_connection(self, hostname=None, userid=None,
  142. password=None, virtual_host=None, port=None, ssl=None,
  143. insist=None, connect_timeout=None, transport=None,
  144. transport_options=None, **kwargs):
  145. """Establish a connection to the message broker.
  146. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  147. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  148. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  149. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  150. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  151. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  152. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  153. :keyword connect_timeout: defaults to the
  154. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  155. :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
  156. setting.
  157. :returns :class:`kombu.connection.BrokerConnection`:
  158. """
  159. conf = self.conf
  160. return self.amqp.BrokerConnection(
  161. hostname or conf.BROKER_HOST,
  162. userid or conf.BROKER_USER,
  163. password or conf.BROKER_PASSWORD,
  164. virtual_host or conf.BROKER_VHOST,
  165. port or conf.BROKER_PORT,
  166. transport=transport or conf.BROKER_TRANSPORT,
  167. insist=self.either("BROKER_INSIST", insist),
  168. ssl=self.either("BROKER_USE_SSL", ssl),
  169. connect_timeout=self.either(
  170. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  171. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  172. **transport_options or {}))
  173. @contextmanager
  174. def default_connection(self, connection=None, connect_timeout=None):
  175. """For use within a with-statement to get a connection from the pool
  176. if one is not already provided.
  177. :keyword connection: If not provided, then a connection will be
  178. acquired from the connection pool.
  179. :keyword connect_timeout: *No longer used.*
  180. """
  181. if connection:
  182. yield connection
  183. else:
  184. with self.pool.acquire(block=True) as connection:
  185. yield connection
  186. def with_default_connection(self, fun):
  187. """With any function accepting `connection` and `connect_timeout`
  188. keyword arguments, establishes a default connection if one is
  189. not already passed to it.
  190. Any automatically established connection will be closed after
  191. the function returns.
  192. **Deprecated**
  193. Use ``with app.default_connection(connection)`` instead.
  194. """
  195. @wraps(fun)
  196. def _inner(*args, **kwargs):
  197. connection = kwargs.pop("connection", None)
  198. with self.default_connection(connection) as c:
  199. return fun(*args, **dict(kwargs, connection=c))
  200. return _inner
  201. def prepare_config(self, c):
  202. """Prepare configuration before it is merged with the defaults."""
  203. find_deprecated_settings(c)
  204. return c
  205. def mail_admins(self, subject, body, fail_silently=False):
  206. """Send an email to the admins in the :setting:`ADMINS` setting."""
  207. if self.conf.ADMINS:
  208. to = [admin_email for _, admin_email in self.conf.ADMINS]
  209. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  210. sender=self.conf.SERVER_EMAIL,
  211. host=self.conf.EMAIL_HOST,
  212. port=self.conf.EMAIL_PORT,
  213. user=self.conf.EMAIL_HOST_USER,
  214. password=self.conf.EMAIL_HOST_PASSWORD,
  215. timeout=self.conf.EMAIL_TIMEOUT,
  216. use_ssl=self.conf.EMAIL_USE_SSL,
  217. use_tls=self.conf.EMAIL_USE_TLS)
  218. def select_queues(self, queues=None):
  219. if queues:
  220. return self.amqp.queues.select_subset(queues,
  221. self.conf.CELERY_CREATE_MISSING_QUEUES)
  222. def either(self, default_key, *values):
  223. """Fallback to the value of a configuration key if none of the
  224. `*values` are true."""
  225. for value in values:
  226. if value is not None:
  227. return value
  228. return self.conf.get(default_key)
  229. def merge(self, l, r):
  230. """Like `dict(a, **b)` except it will keep values from `a`
  231. if the value in `b` is :const:`None`."""
  232. return lpmerge(l, r)
  233. def _get_backend(self):
  234. from ..backends import get_backend_cls
  235. backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
  236. backend_cls = get_backend_cls(backend_cls, loader=self.loader)
  237. return backend_cls(app=self)
  238. def _get_config(self):
  239. return Settings({}, [self.prepare_config(self.loader.conf),
  240. deepcopy(DEFAULTS)])
  241. def _after_fork(self, obj_):
  242. if self._pool:
  243. self._pool.force_close_all()
  244. self._pool = None
  245. def bugreport(self):
  246. import celery
  247. import kombu
  248. return BUGREPORT_INFO % {"system": _platform.system(),
  249. "arch": _platform.architecture(),
  250. "py_i": platforms.pyimplementation(),
  251. "celery_v": celery.__version__,
  252. "kombu_v": kombu.__version__,
  253. "py_v": _platform.python_version(),
  254. "transport": self.conf.BROKER_TRANSPORT,
  255. "results": self.conf.CELERY_RESULT_BACKEND}
  256. @property
  257. def pool(self):
  258. if self._pool is None:
  259. try:
  260. from multiprocessing.util import register_after_fork
  261. register_after_fork(self, self._after_fork)
  262. except ImportError:
  263. pass
  264. limit = self.conf.BROKER_POOL_LIMIT
  265. self._pool = self.broker_connection().Pool(limit)
  266. return self._pool
  267. @cached_property
  268. def amqp(self):
  269. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  270. return instantiate(self.amqp_cls, app=self)
  271. @cached_property
  272. def backend(self):
  273. """Storing/retrieving task state. See
  274. :class:`~celery.backend.base.BaseBackend`."""
  275. return self._get_backend()
  276. @cached_property
  277. def conf(self):
  278. """Current configuration (dict and attribute access)."""
  279. return self._get_config()
  280. @cached_property
  281. def control(self):
  282. """Controlling worker nodes. See
  283. :class:`~celery.task.control.Control`."""
  284. return instantiate(self.control_cls, app=self)
  285. @cached_property
  286. def events(self):
  287. """Sending/receiving events. See :class:`~celery.events.Events`. """
  288. return instantiate(self.events_cls, app=self)
  289. @cached_property
  290. def loader(self):
  291. """Current loader."""
  292. from ..loaders import get_loader_cls
  293. return get_loader_cls(self.loader_cls)(app=self)
  294. @cached_property
  295. def log(self):
  296. """Logging utilities. See :class:`~celery.log.Logging`."""
  297. return instantiate(self.log_cls, app=self)