base.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. """
  2. celery.app.base
  3. ===============
  4. Application Base Class.
  5. :copyright: (c) 2009 - 2010 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. import sys
  9. import platform as _platform
  10. from datetime import timedelta
  11. from celery import routes
  12. from celery.app.defaults import DEFAULTS
  13. from celery.datastructures import ConfigurationView
  14. from celery.utils import noop, isatty
  15. from celery.utils.functional import wraps
  16. class BaseApp(object):
  17. """Base class for apps."""
  18. SYSTEM = _platform.system()
  19. IS_OSX = SYSTEM == "Darwin"
  20. IS_WINDOWS = SYSTEM == "Windows"
  21. def __init__(self, main=None, loader=None, backend=None,
  22. set_as_current=True):
  23. self.main = main
  24. self.loader_cls = loader or "app"
  25. self.backend_cls = backend
  26. self._amqp = None
  27. self._backend = None
  28. self._conf = None
  29. self._control = None
  30. self._loader = None
  31. self._log = None
  32. self._events = None
  33. self.set_as_current = set_as_current
  34. self.on_init()
  35. def on_init(self):
  36. """Called at the end of the constructor."""
  37. pass
  38. def config_from_object(self, obj, silent=False):
  39. """Read configuration from object, where object is either
  40. a real object, or the name of an object to import.
  41. >>> celery.config_from_object("myapp.celeryconfig")
  42. >>> from myapp import celeryconfig
  43. >>> celery.config_from_object(celeryconfig)
  44. """
  45. self._conf = None
  46. return self.loader.config_from_object(obj, silent=silent)
  47. def config_from_envvar(self, variable_name, silent=False):
  48. """Read configuration from environment variable.
  49. The value of the environment variable must be the name
  50. of an object to import.
  51. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  52. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  53. """
  54. self._conf = None
  55. return self.loader.config_from_envvar(variable_name, silent=silent)
  56. def config_from_cmdline(self, argv, namespace="celery"):
  57. """Read configuration from argv.
  58. The config
  59. """
  60. config = self.loader.cmdline_config_parser(argv, namespace)
  61. for key, value in config.items():
  62. self.conf[key] = value
  63. def send_task(self, name, args=None, kwargs=None, countdown=None,
  64. eta=None, task_id=None, publisher=None, connection=None,
  65. connect_timeout=None, result_cls=None, expires=None,
  66. queues=None, **options):
  67. """Send task by name.
  68. :param name: Name of task to execute (e.g. `"tasks.add"`).
  69. :keyword result_cls: Specify custom result class. Default is
  70. using :meth:`AsyncResult`.
  71. Supports the same arguments as
  72. :meth:`~celery.task.base.BaseTask.apply_async`.
  73. """
  74. router = self.amqp.Router(queues)
  75. result_cls = result_cls or self.AsyncResult
  76. options.setdefault("compression",
  77. self.conf.CELERY_MESSAGE_COMPRESSION)
  78. options = router.route(options, name, args, kwargs)
  79. exchange = options.get("exchange")
  80. exchange_type = options.get("exchange_type")
  81. def _do_publish(connection=None, **_):
  82. publish = publisher or self.amqp.TaskPublisher(connection,
  83. exchange=exchange,
  84. exchange_type=exchange_type)
  85. try:
  86. new_id = publish.delay_task(name, args, kwargs,
  87. task_id=task_id,
  88. countdown=countdown, eta=eta,
  89. expires=expires, **options)
  90. finally:
  91. publisher or publish.close()
  92. return result_cls(new_id)
  93. return self.with_default_connection(_do_publish)(
  94. connection=connection, connect_timeout=connect_timeout)
  95. def AsyncResult(self, task_id, backend=None, task_name=None):
  96. """Create :class:`celery.result.BaseAsyncResult` instance."""
  97. from celery.result import BaseAsyncResult
  98. return BaseAsyncResult(task_id, app=self,
  99. task_name=task_name,
  100. backend=backend or self.backend)
  101. def TaskSetResult(self, taskset_id, results, **kwargs):
  102. """Create :class:`celery.result.TaskSetResult` instance."""
  103. from celery.result import TaskSetResult
  104. return TaskSetResult(taskset_id, results, app=self)
  105. def broker_connection(self, hostname=None, userid=None,
  106. password=None, virtual_host=None, port=None, ssl=None,
  107. insist=None, connect_timeout=None, transport=None, **kwargs):
  108. """Establish a connection to the message broker.
  109. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  110. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  111. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  112. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  113. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  114. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  115. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  116. :keyword connect_timeout: defaults to the
  117. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  118. :keyword backend_cls: defaults to the :setting:`BROKER_BACKEND`
  119. setting.
  120. :returns :class:`kombu.connection.BrokerConnection`:
  121. """
  122. return self.amqp.BrokerConnection(
  123. hostname or self.conf.BROKER_HOST,
  124. userid or self.conf.BROKER_USER,
  125. password or self.conf.BROKER_PASSWORD,
  126. virtual_host or self.conf.BROKER_VHOST,
  127. port or self.conf.BROKER_PORT,
  128. transport=transport or self.conf.BROKER_BACKEND,
  129. insist=self.either("BROKER_INSIST", insist),
  130. ssl=self.either("BROKER_USE_SSL", ssl),
  131. connect_timeout=self.either(
  132. "BROKER_CONNECTION_TIMEOUT", connect_timeout))
  133. def with_default_connection(self, fun):
  134. """With any function accepting `connection` and `connect_timeout`
  135. keyword arguments, establishes a default connection if one is
  136. not already passed to it.
  137. Any automatically established connection will be closed after
  138. the function returns.
  139. """
  140. @wraps(fun)
  141. def _inner(*args, **kwargs):
  142. connection = kwargs.get("connection")
  143. timeout = kwargs.get("connect_timeout")
  144. kwargs["connection"] = conn = connection or \
  145. self.broker_connection(connect_timeout=timeout)
  146. close_connection = not connection and conn.close or noop
  147. try:
  148. return fun(*args, **kwargs)
  149. finally:
  150. close_connection()
  151. return _inner
  152. def pre_config_merge(self, c):
  153. """Prepare configuration before it is merged with the defaults."""
  154. if not c.get("CELERY_RESULT_BACKEND"):
  155. rbackend = c.get("CELERY_BACKEND")
  156. if rbackend:
  157. c["CELERY_RESULT_BACKEND"] = rbackend
  158. if not c.get("BROKER_BACKEND"):
  159. cbackend = c.get("BROKER_TRANSPORT") or c.get("CARROT_BACKEND")
  160. if cbackend:
  161. c["BROKER_BACKEND"] = cbackend
  162. return c
  163. def post_config_merge(self, c):
  164. """Prepare configuration after it has been merged with the
  165. defaults."""
  166. if not c.get("CELERY_QUEUES"):
  167. c["CELERY_QUEUES"] = {
  168. c.CELERY_DEFAULT_QUEUE: {
  169. "exchange": c.CELERY_DEFAULT_EXCHANGE,
  170. "exchange_type": c.CELERY_DEFAULT_EXCHANGE_TYPE,
  171. "binding_key": c.CELERY_DEFAULT_ROUTING_KEY}}
  172. c["CELERY_ROUTES"] = routes.prepare(c.get("CELERY_ROUTES") or {})
  173. if c.get("CELERYD_LOG_COLOR") is None:
  174. c["CELERYD_LOG_COLOR"] = not c.CELERYD_LOG_FILE and \
  175. isatty(sys.stderr)
  176. if self.IS_WINDOWS: # windows console doesn't support ANSI colors
  177. c["CELERYD_LOG_COLOR"] = False
  178. if isinstance(c.CELERY_TASK_RESULT_EXPIRES, int):
  179. c["CELERY_TASK_RESULT_EXPIRES"] = timedelta(
  180. seconds=c.CELERY_TASK_RESULT_EXPIRES)
  181. return c
  182. def mail_admins(self, subject, body, fail_silently=False):
  183. """Send an e-mail to the admins in conf.ADMINS."""
  184. if not self.conf.ADMINS:
  185. return
  186. to = [admin_email for _, admin_email in self.conf.ADMINS]
  187. self.loader.mail_admins(subject, body, fail_silently,
  188. to=to, sender=self.conf.SERVER_EMAIL,
  189. host=self.conf.EMAIL_HOST,
  190. port=self.conf.EMAIL_PORT,
  191. user=self.conf.EMAIL_HOST_USER,
  192. password=self.conf.EMAIL_HOST_PASSWORD,
  193. timeout=self.conf.EMAIL_TIMEOUT)
  194. def either(self, default_key, *values):
  195. """Fallback to the value of a configuration key if none of the
  196. `*values` are true."""
  197. for value in values:
  198. if value is not None:
  199. return value
  200. return self.conf.get(default_key)
  201. def merge(self, a, b):
  202. """Like `dict(a, **b)` except it will keep values from `a`
  203. if the value in `b` is :const:`None`."""
  204. b = dict(b)
  205. for key, value in a.items():
  206. if b.get(key) is None:
  207. b[key] = value
  208. return b
  209. def _get_backend(self):
  210. from celery.backends import get_backend_cls
  211. backend_cls = self.backend_cls or self.conf.CELERY_RESULT_BACKEND
  212. backend_cls = get_backend_cls(backend_cls, loader=self.loader)
  213. return backend_cls(app=self)
  214. def _get_config(self):
  215. return self.post_config_merge(ConfigurationView(
  216. self.pre_config_merge(self.loader.conf), DEFAULTS))
  217. @property
  218. def amqp(self):
  219. """Sending/receiving messages.
  220. See :class:`~celery.app.amqp.AMQP`.
  221. """
  222. if self._amqp is None:
  223. from celery.app.amqp import AMQP
  224. self._amqp = AMQP(self)
  225. return self._amqp
  226. @property
  227. def backend(self):
  228. """Storing/retreiving task state.
  229. See :class:`~celery.backend.base.BaseBackend`.
  230. """
  231. if self._backend is None:
  232. self._backend = self._get_backend()
  233. return self._backend
  234. @property
  235. def loader(self):
  236. """Current loader."""
  237. if self._loader is None:
  238. from celery.loaders import get_loader_cls
  239. self._loader = get_loader_cls(self.loader_cls)(app=self)
  240. return self._loader
  241. @property
  242. def conf(self):
  243. """Current configuration (dict and attribute access)."""
  244. if self._conf is None:
  245. self._conf = self._get_config()
  246. return self._conf
  247. @property
  248. def control(self):
  249. """Controlling worker nodes.
  250. See :class:`~celery.task.control.Control`.
  251. """
  252. if self._control is None:
  253. from celery.task.control import Control
  254. self._control = Control(app=self)
  255. return self._control
  256. @property
  257. def log(self):
  258. """Logging utilities.
  259. See :class:`~celery.log.Logging`.
  260. """
  261. if self._log is None:
  262. from celery.log import Logging
  263. self._log = Logging(app=self)
  264. return self._log
  265. @property
  266. def events(self):
  267. if self._events is None:
  268. from celery.events import Events
  269. self._events = Events(app=self)
  270. return self._events