base.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. """
  2. celery.app.base
  3. ===============
  4. Application Base Class.
  5. :copyright: (c) 2009 - 2011 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. from __future__ import absolute_import
  9. from __future__ import with_statement
  10. import platform as _platform
  11. import sys
  12. from contextlib import contextmanager
  13. from copy import deepcopy
  14. from functools import wraps
  15. from threading import Lock
  16. from kombu.utils import cached_property
  17. from celery import datastructures
  18. from celery.app.defaults import DEFAULTS
  19. from celery.utils import instantiate, lpmerge
  20. import kombu
  21. if kombu.VERSION < (1, 1, 0):
  22. raise ImportError("Celery requires Kombu version 1.1.0 or higher.")
  23. BUGREPORT_INFO = """
  24. platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
  25. software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
  26. settings -> transport:%(transport)s results:%(results)s
  27. """
  28. def pyimplementation():
  29. if hasattr(_platform, "python_implementation"):
  30. return _platform.python_implementation()
  31. elif sys.platform.startswith("java"):
  32. return "Jython %s" % (sys.platform, )
  33. elif hasattr(sys, "pypy_version_info"):
  34. v = ".".join(map(str, sys.pypy_version_info[:3]))
  35. if sys.pypy_version_info[3:]:
  36. v += "-" + "".join(map(str, sys.pypy_version_info[3:]))
  37. return "PyPy %s" % (v, )
  38. else:
  39. return "CPython"
  40. class LamportClock(object):
  41. """Lamports logical clock.
  42. From Wikipedia:
  43. "A Lamport logical clock is a monotonically incrementing software counter
  44. maintained in each process. It follows some simple rules:
  45. * A process increments its counter before each event in that process;
  46. * When a process sends a message, it includes its counter value with
  47. the message;
  48. * On receiving a message, the receiver process sets its counter to be
  49. greater than the maximum of its own value and the received value
  50. before it considers the message received.
  51. Conceptually, this logical clock can be thought of as a clock that only
  52. has meaning in relation to messages moving between processes. When a
  53. process receives a message, it resynchronizes its logical clock with
  54. the sender.
  55. .. seealso::
  56. http://en.wikipedia.org/wiki/Lamport_timestamps
  57. http://en.wikipedia.org/wiki/Lamport's_Distributed_
  58. Mutual_Exclusion_Algorithm
  59. *Usage*
  60. When sending a message use :meth:`forward` to increment the clock,
  61. when receiving a message use :meth:`adjust` to sync with
  62. the timestamp of the incoming message.
  63. """
  64. #: The clocks current value.
  65. value = 0
  66. def __init__(self, initial_value=0):
  67. self.value = initial_value
  68. self.mutex = Lock()
  69. def adjust(self, other):
  70. with self.mutex:
  71. self.value = max(self.value, other) + 1
  72. def forward(self):
  73. with self.mutex:
  74. self.value += 1
  75. return self.value
  76. class Settings(datastructures.ConfigurationView):
  77. @property
  78. def CELERY_RESULT_BACKEND(self):
  79. """Resolves deprecated alias ``CELERY_BACKEND``."""
  80. return self.get("CELERY_RESULT_BACKEND") or self.get("CELERY_BACKEND")
  81. @property
  82. def BROKER_TRANSPORT(self):
  83. """Resolves compat aliases :setting:`BROKER_BACKEND`
  84. and :setting:`CARROT_BACKEND`."""
  85. return (self.get("BROKER_TRANSPORT") or
  86. self.get("BROKER_BACKEND") or
  87. self.get("CARROT_BACKEND"))
  88. @property
  89. def BROKER_BACKEND(self):
  90. """Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
  91. return self.BROKER_TRANSPORT
  92. class BaseApp(object):
  93. """Base class for apps."""
  94. SYSTEM = _platform.system()
  95. IS_OSX = SYSTEM == "Darwin"
  96. IS_WINDOWS = SYSTEM == "Windows"
  97. amqp_cls = "celery.app.amqp.AMQP"
  98. backend_cls = None
  99. events_cls = "celery.events.Events"
  100. loader_cls = "celery.loaders.app.AppLoader"
  101. log_cls = "celery.log.Logging"
  102. control_cls = "celery.task.control.Control"
  103. _pool = None
  104. def __init__(self, main=None, loader=None, backend=None,
  105. amqp=None, events=None, log=None, control=None,
  106. set_as_current=True, accept_magic_kwargs=False):
  107. self.main = main
  108. self.amqp_cls = amqp or self.amqp_cls
  109. self.backend_cls = backend or self.backend_cls
  110. self.events_cls = events or self.events_cls
  111. self.loader_cls = loader or self.loader_cls
  112. self.log_cls = log or self.log_cls
  113. self.control_cls = control or self.control_cls
  114. self.set_as_current = set_as_current
  115. self.accept_magic_kwargs = accept_magic_kwargs
  116. self.clock = LamportClock()
  117. self.on_init()
  118. def on_init(self):
  119. """Called at the end of the constructor."""
  120. pass
  121. def config_from_object(self, obj, silent=False):
  122. """Read configuration from object, where object is either
  123. a object, or the name of a module to import.
  124. >>> celery.config_from_object("myapp.celeryconfig")
  125. >>> from myapp import celeryconfig
  126. >>> celery.config_from_object(celeryconfig)
  127. """
  128. del(self.conf)
  129. return self.loader.config_from_object(obj, silent=silent)
  130. def config_from_envvar(self, variable_name, silent=False):
  131. """Read configuration from environment variable.
  132. The value of the environment variable must be the name
  133. of a module to import.
  134. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  135. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  136. """
  137. del(self.conf)
  138. return self.loader.config_from_envvar(variable_name, silent=silent)
  139. def config_from_cmdline(self, argv, namespace="celery"):
  140. """Read configuration from argv.
  141. The config
  142. """
  143. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  144. def send_task(self, name, args=None, kwargs=None, countdown=None,
  145. eta=None, task_id=None, publisher=None, connection=None,
  146. connect_timeout=None, result_cls=None, expires=None,
  147. queues=None, **options):
  148. """Send task by name.
  149. :param name: Name of task to execute (e.g. `"tasks.add"`).
  150. :keyword result_cls: Specify custom result class. Default is
  151. using :meth:`AsyncResult`.
  152. Supports the same arguments as
  153. :meth:`~celery.app.task.BaseTask.apply_async`.
  154. """
  155. router = self.amqp.Router(queues)
  156. result_cls = result_cls or self.AsyncResult
  157. options.setdefault("compression",
  158. self.conf.CELERY_MESSAGE_COMPRESSION)
  159. options = router.route(options, name, args, kwargs)
  160. exchange = options.get("exchange")
  161. exchange_type = options.get("exchange_type")
  162. with self.default_connection(connection, connect_timeout) as conn:
  163. publish = publisher or self.amqp.TaskPublisher(conn,
  164. exchange=exchange,
  165. exchange_type=exchange_type)
  166. try:
  167. new_id = publish.delay_task(name, args, kwargs,
  168. task_id=task_id,
  169. countdown=countdown, eta=eta,
  170. expires=expires, **options)
  171. finally:
  172. publisher or publish.close()
  173. return result_cls(new_id)
  174. def AsyncResult(self, task_id, backend=None, task_name=None):
  175. """Create :class:`celery.result.BaseAsyncResult` instance."""
  176. from celery.result import BaseAsyncResult
  177. return BaseAsyncResult(task_id, app=self, task_name=task_name,
  178. backend=backend or self.backend)
  179. def TaskSetResult(self, taskset_id, results, **kwargs):
  180. """Create :class:`celery.result.TaskSetResult` instance."""
  181. from celery.result import TaskSetResult
  182. return TaskSetResult(taskset_id, results, app=self)
  183. def broker_connection(self, hostname=None, userid=None,
  184. password=None, virtual_host=None, port=None, ssl=None,
  185. insist=None, connect_timeout=None, transport=None, **kwargs):
  186. """Establish a connection to the message broker.
  187. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  188. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  189. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  190. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  191. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  192. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  193. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  194. :keyword connect_timeout: defaults to the
  195. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  196. :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
  197. setting.
  198. :returns :class:`kombu.connection.BrokerConnection`:
  199. """
  200. return self.amqp.BrokerConnection(
  201. hostname or self.conf.BROKER_HOST,
  202. userid or self.conf.BROKER_USER,
  203. password or self.conf.BROKER_PASSWORD,
  204. virtual_host or self.conf.BROKER_VHOST,
  205. port or self.conf.BROKER_PORT,
  206. transport=transport or self.conf.BROKER_TRANSPORT,
  207. insist=self.either("BROKER_INSIST", insist),
  208. ssl=self.either("BROKER_USE_SSL", ssl),
  209. connect_timeout=self.either(
  210. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  211. transport_options=self.conf.BROKER_TRANSPORT_OPTIONS)
  212. @contextmanager
  213. def default_connection(self, connection=None, connect_timeout=None):
  214. """For use within a with-statement to get a connection from the pool
  215. if one is not already provided.
  216. :keyword connection: If not provided, then a connection will be
  217. acquired from the connection pool.
  218. :keyword connect_timeout: *No longer used.*
  219. """
  220. if connection:
  221. yield connection
  222. else:
  223. with self.pool.acquire(block=True) as connection:
  224. yield connection
  225. def with_default_connection(self, fun):
  226. """With any function accepting `connection` and `connect_timeout`
  227. keyword arguments, establishes a default connection if one is
  228. not already passed to it.
  229. Any automatically established connection will be closed after
  230. the function returns.
  231. **Deprecated**
  232. Use ``with app.default_connection(connection)`` instead.
  233. """
  234. @wraps(fun)
  235. def _inner(*args, **kwargs):
  236. connection = kwargs.pop("connection", None)
  237. with self.default_connection(connection) as c:
  238. return fun(*args, **dict(kwargs, connection=c))
  239. return _inner
  240. def prepare_config(self, c):
  241. """Prepare configuration before it is merged with the defaults."""
  242. return c
  243. def mail_admins(self, subject, body, fail_silently=False):
  244. """Send an email to the admins in the :setting:`ADMINS` setting."""
  245. if self.conf.ADMINS:
  246. to = [admin_email for _, admin_email in self.conf.ADMINS]
  247. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  248. sender=self.conf.SERVER_EMAIL,
  249. host=self.conf.EMAIL_HOST,
  250. port=self.conf.EMAIL_PORT,
  251. user=self.conf.EMAIL_HOST_USER,
  252. password=self.conf.EMAIL_HOST_PASSWORD,
  253. timeout=self.conf.EMAIL_TIMEOUT,
  254. use_ssl=self.conf.EMAIL_USE_SSL)
  255. def either(self, default_key, *values):
  256. """Fallback to the value of a configuration key if none of the
  257. `*values` are true."""
  258. for value in values:
  259. if value is not None:
  260. return value
  261. return self.conf.get(default_key)
  262. def merge(self, l, r):
  263. """Like `dict(a, **b)` except it will keep values from `a`
  264. if the value in `b` is :const:`None`."""
  265. return lpmerge(l, r)
  266. def _get_backend(self):
  267. from celery.backends import get_backend_cls
  268. backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
  269. backend_cls = get_backend_cls(backend_cls, loader=self.loader)
  270. return backend_cls(app=self)
  271. def _get_config(self):
  272. return Settings({}, [self.prepare_config(self.loader.conf),
  273. deepcopy(DEFAULTS)])
  274. def _after_fork(self, obj_):
  275. if self._pool:
  276. self._pool.force_close_all()
  277. self._pool = None
  278. def bugreport(self):
  279. import celery
  280. import kombu
  281. return BUGREPORT_INFO % {"system": _platform.system(),
  282. "arch": _platform.architecture(),
  283. "py_i": pyimplementation(),
  284. "celery_v": celery.__version__,
  285. "kombu_v": kombu.__version__,
  286. "py_v": _platform.python_version(),
  287. "transport": self.conf.BROKER_TRANSPORT,
  288. "results": self.conf.CELERY_RESULT_BACKEND}
  289. @property
  290. def pool(self):
  291. if self._pool is None:
  292. try:
  293. from multiprocessing.util import register_after_fork
  294. register_after_fork(self, self._after_fork)
  295. except ImportError:
  296. pass
  297. limit = self.conf.BROKER_POOL_LIMIT
  298. self._pool = self.broker_connection().Pool(limit)
  299. return self._pool
  300. @cached_property
  301. def amqp(self):
  302. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  303. return instantiate(self.amqp_cls, app=self)
  304. @cached_property
  305. def backend(self):
  306. """Storing/retreiving task state. See
  307. :class:`~celery.backend.base.BaseBackend`."""
  308. return self._get_backend()
  309. @cached_property
  310. def conf(self):
  311. """Current configuration (dict and attribute access)."""
  312. return self._get_config()
  313. @cached_property
  314. def control(self):
  315. """Controlling worker nodes. See
  316. :class:`~celery.task.control.Control`."""
  317. return instantiate(self.control_cls, app=self)
  318. @cached_property
  319. def events(self):
  320. """Sending/receiving events. See :class:`~celery.events.Events`. """
  321. return instantiate(self.events_cls, app=self)
  322. @cached_property
  323. def loader(self):
  324. """Current loader."""
  325. from celery.loaders import get_loader_cls
  326. return get_loader_cls(self.loader_cls)(app=self)
  327. @cached_property
  328. def log(self):
  329. """Logging utilities. See :class:`~celery.log.Logging`."""
  330. return instantiate(self.log_cls, app=self)