base.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. """
  2. celery.app.base
  3. ===============
  4. Application Base Class.
  5. :copyright: (c) 2009 - 2011 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. import os
  9. import platform as _platform
  10. from copy import deepcopy
  11. from kombu.utils import cached_property
  12. from celery.app.defaults import DEFAULTS
  13. from celery.datastructures import ConfigurationView
  14. from celery.utils import instantiate, lpmerge
  15. from celery.utils.functional import wraps
  16. class BaseApp(object):
  17. """Base class for apps."""
  18. SYSTEM = _platform.system()
  19. IS_OSX = SYSTEM == "Darwin"
  20. IS_WINDOWS = SYSTEM == "Windows"
  21. amqp_cls = "celery.app.amqp.AMQP"
  22. backend_cls = None
  23. events_cls = "celery.events.Events"
  24. loader_cls = "app"
  25. log_cls = "celery.log.Logging"
  26. control_cls = "celery.task.control.Control"
  27. _pool = None
  28. def __init__(self, main=None, loader=None, backend=None,
  29. amqp=None, events=None, log=None, control=None,
  30. set_as_current=True, accept_magic_kwargs=False):
  31. self.main = main
  32. self.amqp_cls = amqp or self.amqp_cls
  33. self.backend_cls = backend or self.backend_cls
  34. self.events_cls = events or self.events_cls
  35. self.loader_cls = loader or self.loader_cls
  36. self.log_cls = log or self.log_cls
  37. self.control_cls = control or self.control_cls
  38. self.set_as_current = set_as_current
  39. self.accept_magic_kwargs = accept_magic_kwargs
  40. self.on_init()
  41. def on_init(self):
  42. """Called at the end of the constructor."""
  43. pass
  44. def config_from_object(self, obj, silent=False):
  45. """Read configuration from object, where object is either
  46. a real object, or the name of an object to import.
  47. >>> celery.config_from_object("myapp.celeryconfig")
  48. >>> from myapp import celeryconfig
  49. >>> celery.config_from_object(celeryconfig)
  50. """
  51. del(self.conf)
  52. return self.loader.config_from_object(obj, silent=silent)
  53. def config_from_envvar(self, variable_name, silent=False):
  54. """Read configuration from environment variable.
  55. The value of the environment variable must be the name
  56. of an object to import.
  57. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  58. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  59. """
  60. del(self.conf)
  61. return self.loader.config_from_envvar(variable_name, silent=silent)
  62. def config_from_cmdline(self, argv, namespace="celery"):
  63. """Read configuration from argv.
  64. The config
  65. """
  66. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  67. def send_task(self, name, args=None, kwargs=None, countdown=None,
  68. eta=None, task_id=None, publisher=None, connection=None,
  69. connect_timeout=None, result_cls=None, expires=None,
  70. queues=None, **options):
  71. """Send task by name.
  72. :param name: Name of task to execute (e.g. `"tasks.add"`).
  73. :keyword result_cls: Specify custom result class. Default is
  74. using :meth:`AsyncResult`.
  75. Supports the same arguments as
  76. :meth:`~celery.task.base.BaseTask.apply_async`.
  77. """
  78. router = self.amqp.Router(queues)
  79. result_cls = result_cls or self.AsyncResult
  80. options.setdefault("compression",
  81. self.conf.CELERY_MESSAGE_COMPRESSION)
  82. options = router.route(options, name, args, kwargs)
  83. exchange = options.get("exchange")
  84. exchange_type = options.get("exchange_type")
  85. def _do_publish(connection=None, **_):
  86. publish = publisher or self.amqp.TaskPublisher(connection,
  87. exchange=exchange,
  88. exchange_type=exchange_type)
  89. try:
  90. new_id = publish.delay_task(name, args, kwargs,
  91. task_id=task_id,
  92. countdown=countdown, eta=eta,
  93. expires=expires, **options)
  94. finally:
  95. publisher or publish.close()
  96. return result_cls(new_id)
  97. return self.with_default_connection(_do_publish)(
  98. connection=connection, connect_timeout=connect_timeout)
  99. def AsyncResult(self, task_id, backend=None, task_name=None):
  100. """Create :class:`celery.result.BaseAsyncResult` instance."""
  101. from celery.result import BaseAsyncResult
  102. return BaseAsyncResult(task_id, app=self,
  103. task_name=task_name,
  104. backend=backend or self.backend)
  105. def TaskSetResult(self, taskset_id, results, **kwargs):
  106. """Create :class:`celery.result.TaskSetResult` instance."""
  107. from celery.result import TaskSetResult
  108. return TaskSetResult(taskset_id, results, app=self)
  109. def broker_connection(self, hostname=None, userid=None,
  110. password=None, virtual_host=None, port=None, ssl=None,
  111. insist=None, connect_timeout=None, transport=None, **kwargs):
  112. """Establish a connection to the message broker.
  113. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  114. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  115. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  116. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  117. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  118. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  119. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  120. :keyword connect_timeout: defaults to the
  121. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  122. :keyword backend_cls: defaults to the :setting:`BROKER_BACKEND`
  123. setting.
  124. :returns :class:`kombu.connection.BrokerConnection`:
  125. """
  126. return self.amqp.BrokerConnection(
  127. hostname or self.conf.BROKER_HOST,
  128. userid or self.conf.BROKER_USER,
  129. password or self.conf.BROKER_PASSWORD,
  130. virtual_host or self.conf.BROKER_VHOST,
  131. port or self.conf.BROKER_PORT,
  132. transport=transport or self.conf.BROKER_BACKEND,
  133. insist=self.either("BROKER_INSIST", insist),
  134. ssl=self.either("BROKER_USE_SSL", ssl),
  135. connect_timeout=self.either(
  136. "BROKER_CONNECTION_TIMEOUT", connect_timeout))
  137. def with_default_connection(self, fun):
  138. """With any function accepting `connection` and `connect_timeout`
  139. keyword arguments, establishes a default connection if one is
  140. not already passed to it.
  141. Any automatically established connection will be closed after
  142. the function returns.
  143. """
  144. @wraps(fun)
  145. def _inner(*args, **kwargs):
  146. connection = kwargs.get("connection")
  147. timeout = kwargs.get("connect_timeout")
  148. kwargs["connection"] = conn = connection or \
  149. self.pool.acquire(block=True)
  150. close_connection = not connection and conn.release or None
  151. try:
  152. return fun(*args, **kwargs)
  153. finally:
  154. if close_connection:
  155. close_connection()
  156. return _inner
  157. def prepare_config(self, c):
  158. """Prepare configuration before it is merged with the defaults."""
  159. if not c.get("CELERY_RESULT_BACKEND"):
  160. rbackend = c.get("CELERY_BACKEND")
  161. if rbackend:
  162. c["CELERY_RESULT_BACKEND"] = rbackend
  163. if not c.get("BROKER_BACKEND"):
  164. cbackend = c.get("BROKER_TRANSPORT") or c.get("CARROT_BACKEND")
  165. if cbackend:
  166. c["BROKER_BACKEND"] = cbackend
  167. return c
  168. def mail_admins(self, subject, body, fail_silently=False):
  169. """Send an e-mail to the admins in the :setting:`ADMINS` setting."""
  170. if self.conf.ADMINS:
  171. to = [admin_email for _, admin_email in self.conf.ADMINS]
  172. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  173. sender=self.conf.SERVER_EMAIL,
  174. host=self.conf.EMAIL_HOST,
  175. port=self.conf.EMAIL_PORT,
  176. user=self.conf.EMAIL_HOST_USER,
  177. password=self.conf.EMAIL_HOST_PASSWORD,
  178. timeout=self.conf.EMAIL_TIMEOUT)
  179. def either(self, default_key, *values):
  180. """Fallback to the value of a configuration key if none of the
  181. `*values` are true."""
  182. for value in values:
  183. if value is not None:
  184. return value
  185. return self.conf.get(default_key)
  186. def merge(self, l, r):
  187. """Like `dict(a, **b)` except it will keep values from `a`
  188. if the value in `b` is :const:`None`."""
  189. return lpmerge(l, r)
  190. def _get_backend(self):
  191. from celery.backends import get_backend_cls
  192. backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
  193. backend_cls = get_backend_cls(backend_cls, loader=self.loader)
  194. return backend_cls(app=self)
  195. def _get_config(self):
  196. return ConfigurationView({},
  197. [self.prepare_config(self.loader.conf), deepcopy(DEFAULTS)])
  198. def _after_fork(self, obj_):
  199. if self._pool:
  200. self._pool.force_close_all()
  201. self._pool = None
  202. @property
  203. def pool(self):
  204. if self._pool is None:
  205. try:
  206. from multiprocessing.util import register_after_fork
  207. register_after_fork(self, self._after_fork)
  208. except ImportError:
  209. pass
  210. self._pool = self.broker_connection().Pool(self.conf.BROKER_POOL_LIMIT)
  211. return self._pool
  212. @cached_property
  213. def amqp(self):
  214. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  215. return instantiate(self.amqp_cls, app=self)
  216. @cached_property
  217. def backend(self):
  218. """Storing/retreiving task state. See
  219. :class:`~celery.backend.base.BaseBackend`."""
  220. return self._get_backend()
  221. @cached_property
  222. def conf(self):
  223. """Current configuration (dict and attribute access)."""
  224. return self._get_config()
  225. @cached_property
  226. def control(self):
  227. """Controlling worker nodes. See
  228. :class:`~celery.task.control.Control`."""
  229. return instantiate(self.control_cls, app=self)
  230. @cached_property
  231. def events(self):
  232. """Sending/receiving events. See :class:`~celery.events.Events`. """
  233. return instantiate(self.events_cls, app=self)
  234. @cached_property
  235. def loader(self):
  236. """Current loader."""
  237. from celery.loaders import get_loader_cls
  238. return get_loader_cls(self.loader_cls)(app=self)
  239. @cached_property
  240. def log(self):
  241. """Logging utilities. See :class:`~celery.log.Logging`."""
  242. return instantiate(self.log_cls, app=self)