base.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. """
  2. celery.app.base
  3. ===============
  4. Application Base Class.
  5. :copyright: (c) 2009 - 2011 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. from __future__ import absolute_import, with_statement
  9. import platform as _platform
  10. import sys
  11. from contextlib import contextmanager
  12. from copy import deepcopy
  13. from functools import wraps
  14. from threading import Lock
  15. from kombu.utils import cached_property
  16. from celery import datastructures
  17. from celery.app.defaults import DEFAULTS
  18. from celery.utils import instantiate, lpmerge
  19. import kombu
  20. if kombu.VERSION < (1, 1, 0):
  21. raise ImportError("Celery requires Kombu version 1.1.0 or higher.")
  22. BUGREPORT_INFO = """
  23. platform -> system:%(system)s arch:%(arch)s imp:%(py_i)s
  24. software -> celery:%(celery_v)s kombu:%(kombu_v)s py:%(py_v)s
  25. settings -> transport:%(transport)s results:%(results)s
  26. """
  27. def pyimplementation():
  28. if hasattr(_platform, "python_implementation"):
  29. return _platform.python_implementation()
  30. elif sys.platform.startswith("java"):
  31. return "Jython %s" % (sys.platform, )
  32. elif hasattr(sys, "pypy_version_info"):
  33. v = ".".join(map(str, sys.pypy_version_info[:3]))
  34. if sys.pypy_version_info[3:]:
  35. v += "-" + "".join(map(str, sys.pypy_version_info[3:]))
  36. return "PyPy %s" % (v, )
  37. else:
  38. return "CPython"
  39. class LamportClock(object):
  40. """Lamports logical clock.
  41. From Wikipedia:
  42. "A Lamport logical clock is a monotonically incrementing software counter
  43. maintained in each process. It follows some simple rules:
  44. * A process increments its counter before each event in that process;
  45. * When a process sends a message, it includes its counter value with
  46. the message;
  47. * On receiving a message, the receiver process sets its counter to be
  48. greater than the maximum of its own value and the received value
  49. before it considers the message received.
  50. Conceptually, this logical clock can be thought of as a clock that only
  51. has meaning in relation to messages moving between processes. When a
  52. process receives a message, it resynchronizes its logical clock with
  53. the sender.
  54. .. seealso::
  55. http://en.wikipedia.org/wiki/Lamport_timestamps
  56. http://en.wikipedia.org/wiki/Lamport's_Distributed_
  57. Mutual_Exclusion_Algorithm
  58. *Usage*
  59. When sending a message use :meth:`forward` to increment the clock,
  60. when receiving a message use :meth:`adjust` to sync with
  61. the timestamp of the incoming message.
  62. """
  63. #: The clocks current value.
  64. value = 0
  65. def __init__(self, initial_value=0):
  66. self.value = initial_value
  67. self.mutex = Lock()
  68. def adjust(self, other):
  69. with self.mutex:
  70. self.value = max(self.value, other) + 1
  71. def forward(self):
  72. with self.mutex:
  73. self.value += 1
  74. return self.value
  75. class Settings(datastructures.ConfigurationView):
  76. @property
  77. def CELERY_RESULT_BACKEND(self):
  78. """Resolves deprecated alias ``CELERY_BACKEND``."""
  79. return self.get("CELERY_RESULT_BACKEND") or self.get("CELERY_BACKEND")
  80. @property
  81. def BROKER_TRANSPORT(self):
  82. """Resolves compat aliases ``BROKER_BACKEND`` and ``CARROT_BACKEND``."""
  83. return (self.get("BROKER_TRANSPORT") or
  84. self.get("BROKER_BACKEND") or
  85. self.get("CARROT_BACKEND"))
  86. @property
  87. def BROKER_BACKEND(self):
  88. """Deprecated compat alias to :attr:`BROKER_TRANSPORT`."""
  89. return self.BROKER_TRANSPORT
  90. class BaseApp(object):
  91. """Base class for apps."""
  92. SYSTEM = _platform.system()
  93. IS_OSX = SYSTEM == "Darwin"
  94. IS_WINDOWS = SYSTEM == "Windows"
  95. amqp_cls = "celery.app.amqp.AMQP"
  96. backend_cls = None
  97. events_cls = "celery.events.Events"
  98. loader_cls = "app"
  99. log_cls = "celery.log.Logging"
  100. control_cls = "celery.task.control.Control"
  101. _pool = None
  102. def __init__(self, main=None, loader=None, backend=None,
  103. amqp=None, events=None, log=None, control=None,
  104. set_as_current=True, accept_magic_kwargs=False):
  105. self.main = main
  106. self.amqp_cls = amqp or self.amqp_cls
  107. self.backend_cls = backend or self.backend_cls
  108. self.events_cls = events or self.events_cls
  109. self.loader_cls = loader or self.loader_cls
  110. self.log_cls = log or self.log_cls
  111. self.control_cls = control or self.control_cls
  112. self.set_as_current = set_as_current
  113. self.accept_magic_kwargs = accept_magic_kwargs
  114. self.on_init()
  115. self.clock = LamportClock()
  116. def on_init(self):
  117. """Called at the end of the constructor."""
  118. pass
  119. def config_from_object(self, obj, silent=False):
  120. """Read configuration from object, where object is either
  121. a real object, or the name of an object to import.
  122. >>> celery.config_from_object("myapp.celeryconfig")
  123. >>> from myapp import celeryconfig
  124. >>> celery.config_from_object(celeryconfig)
  125. """
  126. del(self.conf)
  127. return self.loader.config_from_object(obj, silent=silent)
  128. def config_from_envvar(self, variable_name, silent=False):
  129. """Read configuration from environment variable.
  130. The value of the environment variable must be the name
  131. of an object to import.
  132. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  133. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  134. """
  135. del(self.conf)
  136. return self.loader.config_from_envvar(variable_name, silent=silent)
  137. def config_from_cmdline(self, argv, namespace="celery"):
  138. """Read configuration from argv.
  139. The config
  140. """
  141. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  142. def send_task(self, name, args=None, kwargs=None, countdown=None,
  143. eta=None, task_id=None, publisher=None, connection=None,
  144. connect_timeout=None, result_cls=None, expires=None,
  145. queues=None, **options):
  146. """Send task by name.
  147. :param name: Name of task to execute (e.g. `"tasks.add"`).
  148. :keyword result_cls: Specify custom result class. Default is
  149. using :meth:`AsyncResult`.
  150. Supports the same arguments as
  151. :meth:`~celery.task.base.BaseTask.apply_async`.
  152. """
  153. router = self.amqp.Router(queues)
  154. result_cls = result_cls or self.AsyncResult
  155. options.setdefault("compression",
  156. self.conf.CELERY_MESSAGE_COMPRESSION)
  157. options = router.route(options, name, args, kwargs)
  158. exchange = options.get("exchange")
  159. exchange_type = options.get("exchange_type")
  160. with self.default_connection(connection, connect_timeout) as conn:
  161. publish = publisher or self.amqp.TaskPublisher(conn,
  162. exchange=exchange,
  163. exchange_type=exchange_type)
  164. try:
  165. new_id = publish.delay_task(name, args, kwargs,
  166. task_id=task_id,
  167. countdown=countdown, eta=eta,
  168. expires=expires, **options)
  169. finally:
  170. publisher or publish.close()
  171. return result_cls(new_id)
  172. def AsyncResult(self, task_id, backend=None, task_name=None):
  173. """Create :class:`celery.result.BaseAsyncResult` instance."""
  174. from celery.result import BaseAsyncResult
  175. return BaseAsyncResult(task_id, app=self,
  176. task_name=task_name,
  177. backend=backend or self.backend)
  178. def TaskSetResult(self, taskset_id, results, **kwargs):
  179. """Create :class:`celery.result.TaskSetResult` instance."""
  180. from celery.result import TaskSetResult
  181. return TaskSetResult(taskset_id, results, app=self)
  182. def broker_connection(self, hostname=None, userid=None,
  183. password=None, virtual_host=None, port=None, ssl=None,
  184. insist=None, connect_timeout=None, transport=None, **kwargs):
  185. """Establish a connection to the message broker.
  186. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  187. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  188. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  189. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  190. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  191. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  192. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  193. :keyword connect_timeout: defaults to the
  194. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  195. :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
  196. setting.
  197. :returns :class:`kombu.connection.BrokerConnection`:
  198. """
  199. return self.amqp.BrokerConnection(
  200. hostname or self.conf.BROKER_HOST,
  201. userid or self.conf.BROKER_USER,
  202. password or self.conf.BROKER_PASSWORD,
  203. virtual_host or self.conf.BROKER_VHOST,
  204. port or self.conf.BROKER_PORT,
  205. transport=transport or self.conf.BROKER_TRANSPORT,
  206. insist=self.either("BROKER_INSIST", insist),
  207. ssl=self.either("BROKER_USE_SSL", ssl),
  208. connect_timeout=self.either(
  209. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  210. transport_options=self.conf.BROKER_TRANSPORT_OPTIONS)
  211. @contextmanager
  212. def default_connection(self, connection=None, connect_timeout=None):
  213. """For use within a with-statement to get a connection from the pool
  214. if one is not already provided."""
  215. if connection:
  216. yield connection
  217. else:
  218. with self.pool.acquire(block=True) as connection:
  219. yield connection
  220. def with_default_connection(self, fun):
  221. """With any function accepting `connection` and `connect_timeout`
  222. keyword arguments, establishes a default connection if one is
  223. not already passed to it.
  224. Any automatically established connection will be closed after
  225. the function returns.
  226. **Deprecated**
  227. Use ``with app.default_connection(connection)`` instead.
  228. """
  229. @wraps(fun)
  230. def _inner(*args, **kwargs):
  231. connection = kwargs.pop("connection", None)
  232. connect_timeout = kwargs.get("connect_timeout")
  233. with self.default_connection(connection, connect_timeout) as c:
  234. return fun(*args, **dict(kwargs, connection=c))
  235. return _inner
  236. def prepare_config(self, c):
  237. """Prepare configuration before it is merged with the defaults."""
  238. return c
  239. def mail_admins(self, subject, body, fail_silently=False):
  240. """Send an email to the admins in the :setting:`ADMINS` setting."""
  241. if self.conf.ADMINS:
  242. to = [admin_email for _, admin_email in self.conf.ADMINS]
  243. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  244. sender=self.conf.SERVER_EMAIL,
  245. host=self.conf.EMAIL_HOST,
  246. port=self.conf.EMAIL_PORT,
  247. user=self.conf.EMAIL_HOST_USER,
  248. password=self.conf.EMAIL_HOST_PASSWORD,
  249. timeout=self.conf.EMAIL_TIMEOUT)
  250. def either(self, default_key, *values):
  251. """Fallback to the value of a configuration key if none of the
  252. `*values` are true."""
  253. for value in values:
  254. if value is not None:
  255. return value
  256. return self.conf.get(default_key)
  257. def merge(self, l, r):
  258. """Like `dict(a, **b)` except it will keep values from `a`
  259. if the value in `b` is :const:`None`."""
  260. return lpmerge(l, r)
  261. def _get_backend(self):
  262. from celery.backends import get_backend_cls
  263. backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
  264. backend_cls = get_backend_cls(backend_cls, loader=self.loader)
  265. return backend_cls(app=self)
  266. def _get_config(self):
  267. return Settings({}, [self.prepare_config(self.loader.conf),
  268. deepcopy(DEFAULTS)])
  269. def _after_fork(self, obj_):
  270. if self._pool:
  271. self._pool.force_close_all()
  272. self._pool = None
  273. def bugreport(self):
  274. import celery
  275. import kombu
  276. return BUGREPORT_INFO % {"system": _platform.system(),
  277. "arch": _platform.architecture(),
  278. "py_i": pyimplementation(),
  279. "celery_v": celery.__version__,
  280. "kombu_v": kombu.__version__,
  281. "py_v": _platform.python_version(),
  282. "transport": self.conf.BROKER_TRANSPORT,
  283. "results": self.conf.CELERY_RESULT_BACKEND}
  284. @property
  285. def pool(self):
  286. if self._pool is None:
  287. try:
  288. from multiprocessing.util import register_after_fork
  289. register_after_fork(self, self._after_fork)
  290. except ImportError:
  291. pass
  292. self._pool = self.broker_connection().Pool(
  293. self.conf.BROKER_POOL_LIMIT)
  294. return self._pool
  295. @cached_property
  296. def amqp(self):
  297. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  298. return instantiate(self.amqp_cls, app=self)
  299. @cached_property
  300. def backend(self):
  301. """Storing/retreiving task state. See
  302. :class:`~celery.backend.base.BaseBackend`."""
  303. return self._get_backend()
  304. @cached_property
  305. def conf(self):
  306. """Current configuration (dict and attribute access)."""
  307. return self._get_config()
  308. @cached_property
  309. def control(self):
  310. """Controlling worker nodes. See
  311. :class:`~celery.task.control.Control`."""
  312. return instantiate(self.control_cls, app=self)
  313. @cached_property
  314. def events(self):
  315. """Sending/receiving events. See :class:`~celery.events.Events`. """
  316. return instantiate(self.events_cls, app=self)
  317. @cached_property
  318. def loader(self):
  319. """Current loader."""
  320. from celery.loaders import get_loader_cls
  321. return get_loader_cls(self.loader_cls)(app=self)
  322. @cached_property
  323. def log(self):
  324. """Logging utilities. See :class:`~celery.log.Logging`."""
  325. return instantiate(self.log_cls, app=self)