base.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. """
  2. celery.app.base
  3. ===============
  4. Application Base Class.
  5. :copyright: (c) 2009 - 2010 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. import sys
  9. import platform as _platform
  10. from datetime import timedelta
  11. from celery import routes
  12. from celery.app.defaults import DEFAULTS
  13. from celery.datastructures import ConfigurationView
  14. from celery.utils import noop, isatty
  15. from celery.utils.functional import wraps
  16. class BaseApp(object):
  17. """Base class for apps."""
  18. SYSTEM = _platform.system()
  19. IS_OSX = SYSTEM == "Darwin"
  20. IS_WINDOWS = SYSTEM == "Windows"
  21. def __init__(self, main=None, loader=None, backend=None,
  22. set_as_current=True):
  23. self.main = main
  24. self.loader_cls = loader or "app"
  25. self.backend_cls = backend
  26. self._amqp = None
  27. self._backend = None
  28. self._conf = None
  29. self._control = None
  30. self._loader = None
  31. self._log = None
  32. self._events = None
  33. self.set_as_current = set_as_current
  34. self.on_init()
  35. def on_init(self):
  36. """Called at the end of the constructor."""
  37. pass
  38. def config_from_object(self, obj, silent=False):
  39. """Read configuration from object, where object is either
  40. a real object, or the name of an object to import.
  41. >>> celery.config_from_object("myapp.celeryconfig")
  42. >>> from myapp import celeryconfig
  43. >>> celery.config_from_object(celeryconfig)
  44. """
  45. self._conf = None
  46. return self.loader.config_from_object(obj, silent=silent)
  47. def config_from_envvar(self, variable_name, silent=False):
  48. """Read configuration from environment variable.
  49. The value of the environment variable must be the name
  50. of an object to import.
  51. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  52. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  53. """
  54. self._conf = None
  55. return self.loader.config_from_envvar(variable_name, silent=silent)
  56. def config_from_cmdline(self, argv, namespace="celery"):
  57. """Read configuration from argv.
  58. The config
  59. """
  60. config = self.loader.cmdline_config_parser(argv, namespace)
  61. for key, value in config.items():
  62. self.conf[key] = value
  63. def send_task(self, name, args=None, kwargs=None, countdown=None,
  64. eta=None, task_id=None, publisher=None, connection=None,
  65. connect_timeout=None, result_cls=None, expires=None,
  66. **options):
  67. """Send task by name.
  68. :param name: Name of task to execute (e.g. `"tasks.add"`).
  69. :keyword result_cls: Specify custom result class. Default is
  70. using :meth:`AsyncResult`.
  71. Supports the same arguments as
  72. :meth:`~celery.task.base.BaseTask.apply_async`.
  73. """
  74. result_cls = result_cls or self.AsyncResult
  75. exchange = options.get("exchange")
  76. exchange_type = options.get("exchange_type")
  77. def _do_publish(connection=None, **_):
  78. publish = publisher or self.amqp.TaskPublisher(connection,
  79. exchange=exchange,
  80. exchange_type=exchange_type)
  81. try:
  82. new_id = publish.delay_task(name, args, kwargs,
  83. task_id=task_id,
  84. countdown=countdown, eta=eta,
  85. expires=expires, **options)
  86. finally:
  87. publisher or publish.close()
  88. return result_cls(new_id)
  89. return self.with_default_connection(_do_publish)(
  90. connection=connection, connect_timeout=connect_timeout)
  91. def AsyncResult(self, task_id, backend=None, task_name=None):
  92. """Create :class:`celery.result.BaseAsyncResult` instance."""
  93. from celery.result import BaseAsyncResult
  94. return BaseAsyncResult(task_id, app=self,
  95. task_name=task_name,
  96. backend=backend or self.backend)
  97. def TaskSetResult(self, taskset_id, results, **kwargs):
  98. """Create :class:`celery.result.TaskSetResult` instance."""
  99. from celery.result import TaskSetResult
  100. return TaskSetResult(taskset_id, results, app=self)
  101. def broker_connection(self, hostname=None, userid=None,
  102. password=None, virtual_host=None, port=None, ssl=None,
  103. insist=None, connect_timeout=None, transport=None, **kwargs):
  104. """Establish a connection to the message broker.
  105. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  106. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  107. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  108. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  109. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  110. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  111. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  112. :keyword connect_timeout: defaults to the
  113. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  114. :keyword backend_cls: defaults to the :setting:`BROKER_BACKEND`
  115. setting.
  116. :returns :class:`kombu.connection.BrokerConnection`:
  117. """
  118. return self.amqp.BrokerConnection(
  119. hostname or self.conf.BROKER_HOST,
  120. userid or self.conf.BROKER_USER,
  121. password or self.conf.BROKER_PASSWORD,
  122. virtual_host or self.conf.BROKER_VHOST,
  123. port or self.conf.BROKER_PORT,
  124. transport=transport or self.conf.BROKER_BACKEND,
  125. insist=self.either("BROKER_INSIST", insist),
  126. ssl=self.either("BROKER_USE_SSL", ssl),
  127. connect_timeout=self.either(
  128. "BROKER_CONNECTION_TIMEOUT", connect_timeout))
  129. def with_default_connection(self, fun):
  130. """With any function accepting `connection` and `connect_timeout`
  131. keyword arguments, establishes a default connection if one is
  132. not already passed to it.
  133. Any automatically established connection will be closed after
  134. the function returns.
  135. """
  136. @wraps(fun)
  137. def _inner(*args, **kwargs):
  138. connection = kwargs.get("connection")
  139. timeout = kwargs.get("connect_timeout")
  140. kwargs["connection"] = conn = connection or \
  141. self.broker_connection(connect_timeout=timeout)
  142. close_connection = not connection and conn.close or noop
  143. try:
  144. return fun(*args, **kwargs)
  145. finally:
  146. close_connection()
  147. return _inner
  148. def pre_config_merge(self, c):
  149. """Prepare configuration before it is merged with the defaults."""
  150. if not c.get("CELERY_RESULT_BACKEND"):
  151. rbackend = c.get("CELERY_BACKEND")
  152. if rbackend:
  153. c["CELERY_RESULT_BACKEND"] = rbackend
  154. if not c.get("BROKER_BACKEND"):
  155. cbackend = c.get("BROKER_TRANSPORT") or c.get("CARROT_BACKEND")
  156. if cbackend:
  157. c["BROKER_BACKEND"] = cbackend
  158. return c
  159. def post_config_merge(self, c):
  160. """Prepare configuration after it has been merged with the
  161. defaults."""
  162. if not c.get("CELERY_QUEUES"):
  163. c["CELERY_QUEUES"] = {
  164. c.CELERY_DEFAULT_QUEUE: {
  165. "exchange": c.CELERY_DEFAULT_EXCHANGE,
  166. "exchange_type": c.CELERY_DEFAULT_EXCHANGE_TYPE,
  167. "binding_key": c.CELERY_DEFAULT_ROUTING_KEY}}
  168. c["CELERY_ROUTES"] = routes.prepare(c.get("CELERY_ROUTES") or {})
  169. if c.get("CELERYD_LOG_COLOR") is None:
  170. c["CELERYD_LOG_COLOR"] = not c.CELERYD_LOG_FILE and \
  171. isatty(sys.stderr)
  172. if self.IS_WINDOWS: # windows console doesn't support ANSI colors
  173. c["CELERYD_LOG_COLOR"] = False
  174. if isinstance(c.CELERY_TASK_RESULT_EXPIRES, int):
  175. c["CELERY_TASK_RESULT_EXPIRES"] = timedelta(
  176. seconds=c.CELERY_TASK_RESULT_EXPIRES)
  177. return c
  178. def mail_admins(self, subject, body, fail_silently=False):
  179. """Send an e-mail to the admins in conf.ADMINS."""
  180. if not self.conf.ADMINS:
  181. return
  182. to = [admin_email for _, admin_email in self.conf.ADMINS]
  183. self.loader.mail_admins(subject, body, fail_silently,
  184. to=to, sender=self.conf.SERVER_EMAIL,
  185. host=self.conf.EMAIL_HOST,
  186. port=self.conf.EMAIL_PORT,
  187. user=self.conf.EMAIL_HOST_USER,
  188. password=self.conf.EMAIL_HOST_PASSWORD)
  189. def either(self, default_key, *values):
  190. """Fallback to the value of a configuration key if none of the
  191. `*values` are true."""
  192. for value in values:
  193. if value is not None:
  194. return value
  195. return self.conf.get(default_key)
  196. def merge(self, a, b):
  197. """Like `dict(a, **b)` except it will keep values from `a`
  198. if the value in `b` is :const:`None`."""
  199. b = dict(b)
  200. for key, value in a.items():
  201. if b.get(key) is None:
  202. b[key] = value
  203. return b
  204. def _get_backend(self):
  205. from celery.backends import get_backend_cls
  206. backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
  207. backend_cls = get_backend_cls(backend_cls, loader=self.loader)
  208. return backend_cls(app=self)
  209. def _get_config(self):
  210. return self.post_config_merge(ConfigurationView(
  211. self.pre_config_merge(self.loader.conf), DEFAULTS))
  212. @property
  213. def amqp(self):
  214. """Sending/receiving messages.
  215. See :class:`~celery.app.amqp.AMQP`.
  216. """
  217. if self._amqp is None:
  218. from celery.app.amqp import AMQP
  219. self._amqp = AMQP(self)
  220. return self._amqp
  221. @property
  222. def backend(self):
  223. """Storing/retreiving task state.
  224. See :class:`~celery.backend.base.BaseBackend`.
  225. """
  226. if self._backend is None:
  227. self._backend = self._get_backend()
  228. return self._backend
  229. @property
  230. def loader(self):
  231. """Current loader."""
  232. if self._loader is None:
  233. from celery.loaders import get_loader_cls
  234. self._loader = get_loader_cls(self.loader_cls)(app=self)
  235. return self._loader
  236. @property
  237. def conf(self):
  238. """Current configuration (dict and attribute access)."""
  239. if self._conf is None:
  240. self._conf = self._get_config()
  241. return self._conf
  242. @property
  243. def control(self):
  244. """Controlling worker nodes.
  245. See :class:`~celery.task.control.Control`.
  246. """
  247. if self._control is None:
  248. from celery.task.control import Control
  249. self._control = Control(app=self)
  250. return self._control
  251. @property
  252. def log(self):
  253. """Logging utilities.
  254. See :class:`~celery.log.Logging`.
  255. """
  256. if self._log is None:
  257. from celery.log import Logging
  258. self._log = Logging(app=self)
  259. return self._log
  260. @property
  261. def events(self):
  262. if self._events is None:
  263. from celery.events import Events
  264. self._events = Events(app=self)
  265. return self._events