base.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Application Base Class.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import os
  12. import platform as _platform
  13. from contextlib import contextmanager
  14. from copy import deepcopy
  15. from functools import wraps
  16. from kombu.clocks import LamportClock
  17. from .. import datastructures
  18. from .. import platforms
  19. from ..utils import cached_property, instantiate, lpmerge
  20. from .defaults import DEFAULTS, find_deprecated_settings, find
  21. import kombu
  22. if kombu.VERSION < (1, 1, 0):
  23. raise ImportError("Celery requires Kombu version 1.1.0 or higher.")
  24. BUGREPORT_INFO = """
  25. platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
  26. software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
  27. settings -> transport:%(transport)s results:%(results)s
  28. """
  29. class Settings(datastructures.ConfigurationView):
  30. @property
  31. def CELERY_RESULT_BACKEND(self):
  32. """Resolves deprecated alias ``CELERY_BACKEND``."""
  33. return self.get("CELERY_RESULT_BACKEND") or self.get("CELERY_BACKEND")
  34. @property
  35. def BROKER_TRANSPORT(self):
  36. """Resolves compat aliases :setting:`BROKER_BACKEND`
  37. and :setting:`CARROT_BACKEND`."""
  38. return (self.get("BROKER_TRANSPORT") or
  39. self.get("BROKER_BACKEND") or
  40. self.get("CARROT_BACKEND"))
  41. @property
  42. def BROKER_BACKEND(self):
  43. """Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
  44. return self.BROKER_TRANSPORT
  45. @property
  46. def BROKER_HOST(self):
  47. return (os.environ.get("CELERY_BROKER_URL") or
  48. self.get("BROKER_URL") or
  49. self.get("BROKER_HOST"))
  50. def find_option(self, name, namespace="celery"):
  51. return find(name, namespace)
  52. def get_by_parts(self, *parts):
  53. return self["_".join(filter(None, parts))]
  54. def find_value_for_key(self, name, namespace="celery"):
  55. ns, key, _ = self.find_option(name, namespace=namespace)
  56. return self.get_by_parts(ns, key)
  57. class BaseApp(object):
  58. """Base class for apps."""
  59. SYSTEM = platforms.SYSTEM
  60. IS_OSX = platforms.IS_OSX
  61. IS_WINDOWS = platforms.IS_WINDOWS
  62. amqp_cls = "celery.app.amqp:AMQP"
  63. backend_cls = None
  64. events_cls = "celery.events:Events"
  65. loader_cls = "celery.loaders.app:AppLoader"
  66. log_cls = "celery.log:Logging"
  67. control_cls = "celery.task.control:Control"
  68. _pool = None
  69. def __init__(self, main=None, loader=None, backend=None,
  70. amqp=None, events=None, log=None, control=None,
  71. set_as_current=True, accept_magic_kwargs=False, **kwargs):
  72. self.main = main
  73. self.amqp_cls = amqp or self.amqp_cls
  74. self.backend_cls = backend or self.backend_cls
  75. self.events_cls = events or self.events_cls
  76. self.loader_cls = loader or self.loader_cls
  77. self.log_cls = log or self.log_cls
  78. self.control_cls = control or self.control_cls
  79. self.set_as_current = set_as_current
  80. self.accept_magic_kwargs = accept_magic_kwargs
  81. self.clock = LamportClock()
  82. self.on_init()
  83. def on_init(self):
  84. """Called at the end of the constructor."""
  85. pass
  86. def config_from_object(self, obj, silent=False):
  87. """Read configuration from object, where object is either
  88. a object, or the name of a module to import.
  89. >>> celery.config_from_object("myapp.celeryconfig")
  90. >>> from myapp import celeryconfig
  91. >>> celery.config_from_object(celeryconfig)
  92. """
  93. del(self.conf)
  94. return self.loader.config_from_object(obj, silent=silent)
  95. def config_from_envvar(self, variable_name, silent=False):
  96. """Read configuration from environment variable.
  97. The value of the environment variable must be the name
  98. of a module to import.
  99. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  100. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  101. """
  102. del(self.conf)
  103. return self.loader.config_from_envvar(variable_name, silent=silent)
  104. def config_from_cmdline(self, argv, namespace="celery"):
  105. """Read configuration from argv.
  106. The config
  107. """
  108. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  109. def send_task(self, name, args=None, kwargs=None, countdown=None,
  110. eta=None, task_id=None, publisher=None, connection=None,
  111. connect_timeout=None, result_cls=None, expires=None,
  112. queues=None, **options):
  113. """Send task by name.
  114. :param name: Name of task to execute (e.g. `"tasks.add"`).
  115. :keyword result_cls: Specify custom result class. Default is
  116. using :meth:`AsyncResult`.
  117. Supports the same arguments as
  118. :meth:`~celery.app.task.BaseTask.apply_async`.
  119. """
  120. router = self.amqp.Router(queues)
  121. result_cls = result_cls or self.AsyncResult
  122. options.setdefault("compression",
  123. self.conf.CELERY_MESSAGE_COMPRESSION)
  124. options = router.route(options, name, args, kwargs)
  125. exchange = options.get("exchange")
  126. exchange_type = options.get("exchange_type")
  127. with self.default_connection(connection, connect_timeout) as conn:
  128. publish = publisher or self.amqp.TaskPublisher(conn,
  129. exchange=exchange,
  130. exchange_type=exchange_type)
  131. try:
  132. new_id = publish.delay_task(name, args, kwargs,
  133. task_id=task_id,
  134. countdown=countdown, eta=eta,
  135. expires=expires, **options)
  136. finally:
  137. publisher or publish.close()
  138. return result_cls(new_id)
  139. def AsyncResult(self, task_id, backend=None, task_name=None):
  140. """Create :class:`celery.result.BaseAsyncResult` instance."""
  141. from ..result import BaseAsyncResult
  142. return BaseAsyncResult(task_id, app=self, task_name=task_name,
  143. backend=backend or self.backend)
  144. def TaskSetResult(self, taskset_id, results, **kwargs):
  145. """Create :class:`celery.result.TaskSetResult` instance."""
  146. from ..result import TaskSetResult
  147. return TaskSetResult(taskset_id, results, app=self)
  148. def broker_connection(self, hostname=None, userid=None,
  149. password=None, virtual_host=None, port=None, ssl=None,
  150. insist=None, connect_timeout=None, transport=None,
  151. transport_options=None, **kwargs):
  152. """Establish a connection to the message broker.
  153. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  154. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  155. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  156. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  157. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  158. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  159. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  160. :keyword connect_timeout: defaults to the
  161. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  162. :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
  163. setting.
  164. :returns :class:`kombu.connection.BrokerConnection`:
  165. """
  166. conf = self.conf
  167. return self.amqp.BrokerConnection(
  168. hostname or conf.BROKER_HOST,
  169. userid or conf.BROKER_USER,
  170. password or conf.BROKER_PASSWORD,
  171. virtual_host or conf.BROKER_VHOST,
  172. port or conf.BROKER_PORT,
  173. transport=transport or conf.BROKER_TRANSPORT,
  174. insist=self.either("BROKER_INSIST", insist),
  175. ssl=self.either("BROKER_USE_SSL", ssl),
  176. connect_timeout=self.either(
  177. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  178. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  179. **transport_options or {}))
  180. @contextmanager
  181. def default_connection(self, connection=None, connect_timeout=None):
  182. """For use within a with-statement to get a connection from the pool
  183. if one is not already provided.
  184. :keyword connection: If not provided, then a connection will be
  185. acquired from the connection pool.
  186. :keyword connect_timeout: *No longer used.*
  187. """
  188. if connection:
  189. yield connection
  190. else:
  191. with self.pool.acquire(block=True) as connection:
  192. yield connection
  193. def with_default_connection(self, fun):
  194. """With any function accepting `connection` and `connect_timeout`
  195. keyword arguments, establishes a default connection if one is
  196. not already passed to it.
  197. Any automatically established connection will be closed after
  198. the function returns.
  199. **Deprecated**
  200. Use ``with app.default_connection(connection)`` instead.
  201. """
  202. @wraps(fun)
  203. def _inner(*args, **kwargs):
  204. connection = kwargs.pop("connection", None)
  205. with self.default_connection(connection) as c:
  206. return fun(*args, **dict(kwargs, connection=c))
  207. return _inner
  208. def prepare_config(self, c):
  209. """Prepare configuration before it is merged with the defaults."""
  210. find_deprecated_settings(c)
  211. return c
  212. def mail_admins(self, subject, body, fail_silently=False):
  213. """Send an email to the admins in the :setting:`ADMINS` setting."""
  214. if self.conf.ADMINS:
  215. to = [admin_email for _, admin_email in self.conf.ADMINS]
  216. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  217. sender=self.conf.SERVER_EMAIL,
  218. host=self.conf.EMAIL_HOST,
  219. port=self.conf.EMAIL_PORT,
  220. user=self.conf.EMAIL_HOST_USER,
  221. password=self.conf.EMAIL_HOST_PASSWORD,
  222. timeout=self.conf.EMAIL_TIMEOUT,
  223. use_ssl=self.conf.EMAIL_USE_SSL,
  224. use_tls=self.conf.EMAIL_USE_TLS)
  225. def select_queues(self, queues=None):
  226. if queues:
  227. return self.amqp.queues.select_subset(queues,
  228. self.conf.CELERY_CREATE_MISSING_QUEUES)
  229. def either(self, default_key, *values):
  230. """Fallback to the value of a configuration key if none of the
  231. `*values` are true."""
  232. for value in values:
  233. if value is not None:
  234. return value
  235. return self.conf.get(default_key)
  236. def merge(self, l, r):
  237. """Like `dict(a, **b)` except it will keep values from `a`
  238. if the value in `b` is :const:`None`."""
  239. return lpmerge(l, r)
  240. def _get_backend(self):
  241. from ..backends import get_backend_cls
  242. backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
  243. backend_cls = get_backend_cls(backend_cls, loader=self.loader)
  244. return backend_cls(app=self)
  245. def _get_config(self):
  246. return Settings({}, [self.prepare_config(self.loader.conf),
  247. deepcopy(DEFAULTS)])
  248. def _after_fork(self, obj_):
  249. if self._pool:
  250. self._pool.force_close_all()
  251. self._pool = None
  252. def bugreport(self):
  253. import celery
  254. import kombu
  255. return BUGREPORT_INFO % {"system": _platform.system(),
  256. "arch": _platform.architecture(),
  257. "py_i": platforms.pyimplementation(),
  258. "celery_v": celery.__version__,
  259. "kombu_v": kombu.__version__,
  260. "py_v": _platform.python_version(),
  261. "transport": self.conf.BROKER_TRANSPORT,
  262. "results": self.conf.CELERY_RESULT_BACKEND}
  263. @property
  264. def pool(self):
  265. if self._pool is None:
  266. try:
  267. from multiprocessing.util import register_after_fork
  268. register_after_fork(self, self._after_fork)
  269. except ImportError:
  270. pass
  271. limit = self.conf.BROKER_POOL_LIMIT
  272. self._pool = self.broker_connection().Pool(limit)
  273. return self._pool
  274. @cached_property
  275. def amqp(self):
  276. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  277. return instantiate(self.amqp_cls, app=self)
  278. @cached_property
  279. def backend(self):
  280. """Storing/retrieving task state. See
  281. :class:`~celery.backend.base.BaseBackend`."""
  282. return self._get_backend()
  283. @cached_property
  284. def conf(self):
  285. """Current configuration (dict and attribute access)."""
  286. return self._get_config()
  287. @cached_property
  288. def control(self):
  289. """Controlling worker nodes. See
  290. :class:`~celery.task.control.Control`."""
  291. return instantiate(self.control_cls, app=self)
  292. @cached_property
  293. def events(self):
  294. """Sending/receiving events. See :class:`~celery.events.Events`. """
  295. return instantiate(self.events_cls, app=self)
  296. @cached_property
  297. def loader(self):
  298. """Current loader."""
  299. from ..loaders import get_loader_cls
  300. return get_loader_cls(self.loader_cls)(app=self)
  301. @cached_property
  302. def log(self):
  303. """Logging utilities. See :class:`~celery.log.Logging`."""
  304. return instantiate(self.log_cls, app=self)