base.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.base
  4. ~~~~~~~~~~~~~~~
  5. Application Base Class.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. from __future__ import with_statement
  11. import warnings
  12. from collections import deque
  13. from contextlib import contextmanager
  14. from copy import deepcopy
  15. from functools import wraps
  16. from kombu.clocks import LamportClock
  17. from .. import platforms
  18. from ..backends import get_backend_by_url
  19. from ..exceptions import AlwaysEagerIgnored
  20. from ..loaders import get_loader_cls
  21. from ..local import PromiseProxy, maybe_evaluate
  22. from ..utils import cached_property, register_after_fork
  23. from ..utils.functional import first
  24. from ..utils.imports import instantiate
  25. from . import annotations
  26. from .builtins import load_builtin_tasks
  27. from .defaults import DEFAULTS, find_deprecated_settings
  28. from .state import _tls
  29. from .utils import AppPickler, Settings, bugreport, _unpickle_app
  30. class App(object):
  31. """Celery Application.
  32. :param main: Name of the main module if running as `__main__`.
  33. :keyword loader: The loader class, or the name of the loader class to use.
  34. Default is :class:`celery.loaders.app.AppLoader`.
  35. :keyword backend: The result store backend class, or the name of the
  36. backend class to use. Default is the value of the
  37. :setting:`CELERY_RESULT_BACKEND` setting.
  38. :keyword amqp: AMQP object or class name.
  39. :keyword events: Events object or class name.
  40. :keyword log: Log object or class name.
  41. :keyword control: Control object or class name.
  42. :keyword set_as_current: Make this the global current app.
  43. """
  44. Pickler = AppPickler
  45. SYSTEM = platforms.SYSTEM
  46. IS_OSX, IS_WINDOWS = platforms.IS_OSX, platforms.IS_WINDOWS
  47. amqp_cls = "celery.app.amqp:AMQP"
  48. backend_cls = None
  49. events_cls = "celery.events:Events"
  50. loader_cls = "celery.loaders.app:AppLoader"
  51. log_cls = "celery.app.log:Logging"
  52. control_cls = "celery.app.control:Control"
  53. registry_cls = "celery.app.registry:TaskRegistry"
  54. _pool = None
  55. def __init__(self, main=None, loader=None, backend=None,
  56. amqp=None, events=None, log=None, control=None,
  57. set_as_current=True, accept_magic_kwargs=False,
  58. tasks=None, broker=None, **kwargs):
  59. self.clock = LamportClock()
  60. self.main = main
  61. self.amqp_cls = amqp or self.amqp_cls
  62. self.backend_cls = backend or self.backend_cls
  63. self.events_cls = events or self.events_cls
  64. self.loader_cls = loader or self.loader_cls
  65. self.log_cls = log or self.log_cls
  66. self.control_cls = control or self.control_cls
  67. self.set_as_current = set_as_current
  68. self.registry_cls = self.registry_cls if tasks is None else tasks
  69. self.accept_magic_kwargs = accept_magic_kwargs
  70. self.finalized = False
  71. self._pending = deque()
  72. self._tasks = instantiate(self.registry_cls)
  73. # these options are moved to the config to
  74. # simplify pickling of the app object.
  75. self._preconf = {}
  76. if broker:
  77. self._preconf["BROKER_URL"] = broker
  78. if self.set_as_current:
  79. self.set_current()
  80. self.on_init()
  81. def set_current(self):
  82. """Make this the current app for this thread."""
  83. _tls.current_app = self
  84. def on_init(self):
  85. pass
  86. def create_task_cls(self):
  87. """Creates a base task class using default configuration
  88. taken from this app."""
  89. from .task import BaseTask
  90. class Task(BaseTask):
  91. app = self
  92. abstract = True
  93. Task.__doc__ = BaseTask.__doc__
  94. Task.bind(self)
  95. return Task
  96. def Worker(self, **kwargs):
  97. """Create new :class:`~celery.apps.worker.Worker` instance."""
  98. return instantiate("celery.apps.worker:Worker", app=self, **kwargs)
  99. def WorkController(self, **kwargs):
  100. return instantiate("celery.worker:WorkController", app=self, **kwargs)
  101. def Beat(self, **kwargs):
  102. """Create new :class:`~celery.apps.beat.Beat` instance."""
  103. return instantiate("celery.apps.beat:Beat", app=self, **kwargs)
  104. def TaskSet(self, *args, **kwargs):
  105. """Create new :class:`~celery.task.sets.TaskSet`."""
  106. return instantiate("celery.task.sets:TaskSet",
  107. app=self, *args, **kwargs)
  108. def start(self, argv=None):
  109. """Run :program:`celery` using `argv`. Uses :data:`sys.argv`
  110. if `argv` is not specified."""
  111. return instantiate("celery.bin.celery:CeleryCommand", app=self) \
  112. .execute_from_commandline(argv)
  113. def worker_main(self, argv=None):
  114. """Run :program:`celeryd` using `argv`. Uses :data:`sys.argv`
  115. if `argv` is not specified."""
  116. return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
  117. .execute_from_commandline(argv)
  118. def task(self, *args, **options):
  119. """Decorator to create a task class out of any callable.
  120. **Examples:**
  121. .. code-block:: python
  122. @task
  123. def refresh_feed(url):
  124. return ...
  125. with setting extra options:
  126. .. code-block:: python
  127. @task(exchange="feeds")
  128. def refresh_feed(url):
  129. return ...
  130. .. admonition:: App Binding
  131. For custom apps the task decorator returns proxy
  132. objects, so that the act of creating the task is not performed
  133. until the task is used or the task registry is accessed.
  134. If you are depending on binding to be deferred, then you must
  135. not access any attributes on the returned object until the
  136. application is fully set up (finalized).
  137. """
  138. def inner_create_task_cls(**options):
  139. def _create_task_cls(fun):
  140. if self.accept_magic_kwargs: # compat mode
  141. return self._task_from_fun(fun, **options)
  142. # return a proxy object that is only evaluated when first used
  143. promise = PromiseProxy(self._task_from_fun, (fun, ), options)
  144. self._pending.append(promise)
  145. return promise
  146. return _create_task_cls
  147. if len(args) == 1 and callable(args[0]):
  148. return inner_create_task_cls(**options)(*args)
  149. return inner_create_task_cls(**options)
  150. def _task_from_fun(self, fun, **options):
  151. base = options.pop("base", None) or self.Task
  152. T = type(fun.__name__, (base, ), dict({
  153. "app": self,
  154. "accept_magic_kwargs": False,
  155. "run": staticmethod(fun),
  156. "__doc__": fun.__doc__,
  157. "__module__": fun.__module__}, **options))()
  158. return self._tasks[T.name] # return global instance.
  159. def annotate_task(self, task):
  160. if self.annotations:
  161. match = annotations._first_match(self.annotations, task)
  162. for attr, value in (match or {}).iteritems():
  163. setattr(task, attr, value)
  164. match_any = annotations._first_match_any(self.annotations)
  165. for attr, value in (match_any or {}).iteritems():
  166. setattr(task, attr, value)
  167. @cached_property
  168. def Task(self):
  169. """Default Task base class for this application."""
  170. return self.create_task_cls()
  171. @cached_property
  172. def annotations(self):
  173. return annotations.prepare(self.conf.CELERY_ANNOTATIONS)
  174. def __repr__(self):
  175. return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
  176. def __reduce__(self):
  177. # Reduce only pickles the configuration changes,
  178. # so the default configuration doesn't have to be passed
  179. # between processes.
  180. return (_unpickle_app, (self.__class__, self.Pickler)
  181. + self.__reduce_args__())
  182. def __reduce_args__(self):
  183. return (self.main,
  184. self.conf.changes,
  185. self.loader_cls,
  186. self.backend_cls,
  187. self.amqp_cls,
  188. self.events_cls,
  189. self.log_cls,
  190. self.control_cls,
  191. self.accept_magic_kwargs)
  192. def finalize(self):
  193. if not self.finalized:
  194. load_builtin_tasks(self)
  195. pending = self._pending
  196. while pending:
  197. maybe_evaluate(pending.pop())
  198. self.finalized = True
  199. def config_from_object(self, obj, silent=False):
  200. """Read configuration from object, where object is either
  201. a object, or the name of a module to import.
  202. >>> celery.config_from_object("myapp.celeryconfig")
  203. >>> from myapp import celeryconfig
  204. >>> celery.config_from_object(celeryconfig)
  205. """
  206. del(self.conf)
  207. return self.loader.config_from_object(obj, silent=silent)
  208. def config_from_envvar(self, variable_name, silent=False):
  209. """Read configuration from environment variable.
  210. The value of the environment variable must be the name
  211. of a module to import.
  212. >>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
  213. >>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
  214. """
  215. del(self.conf)
  216. return self.loader.config_from_envvar(variable_name, silent=silent)
  217. def config_from_cmdline(self, argv, namespace="celery"):
  218. """Read configuration from argv."""
  219. self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
  220. def send_task(self, name, args=None, kwargs=None, countdown=None,
  221. eta=None, task_id=None, publisher=None, connection=None,
  222. connect_timeout=None, result_cls=None, expires=None,
  223. queues=None, **options):
  224. """Send task by name.
  225. :param name: Name of task to execute (e.g. `"tasks.add"`).
  226. :keyword result_cls: Specify custom result class. Default is
  227. using :meth:`AsyncResult`.
  228. Supports the same arguments as
  229. :meth:`~celery.app.task.BaseTask.apply_async`.
  230. """
  231. if self.conf.CELERY_ALWAYS_EAGER:
  232. warnings.warn(AlwaysEagerIgnored(
  233. "CELERY_ALWAYS_EAGER has no effect on send_task"))
  234. router = self.amqp.Router(queues)
  235. result_cls = result_cls or self.AsyncResult
  236. options.setdefault("compression",
  237. self.conf.CELERY_MESSAGE_COMPRESSION)
  238. options = router.route(options, name, args, kwargs)
  239. exchange = options.get("exchange")
  240. exchange_type = options.get("exchange_type")
  241. with self.default_connection(connection, connect_timeout) as conn:
  242. publish = publisher or self.amqp.TaskPublisher(conn,
  243. exchange=exchange,
  244. exchange_type=exchange_type)
  245. try:
  246. new_id = publish.delay_task(name, args, kwargs,
  247. task_id=task_id,
  248. countdown=countdown, eta=eta,
  249. expires=expires, **options)
  250. finally:
  251. publisher or publish.close()
  252. return result_cls(new_id)
  253. def AsyncResult(self, task_id, backend=None, task_name=None):
  254. """Create :class:`celery.result.BaseAsyncResult` instance."""
  255. from ..result import AsyncResult
  256. return AsyncResult(task_id, app=self, task_name=task_name,
  257. backend=backend or self.backend)
  258. def TaskSetResult(self, taskset_id, results, **kwargs):
  259. """Create :class:`celery.result.TaskSetResult` instance."""
  260. from ..result import TaskSetResult
  261. return TaskSetResult(taskset_id, results, app=self)
  262. def broker_connection(self, hostname=None, userid=None,
  263. password=None, virtual_host=None, port=None, ssl=None,
  264. insist=None, connect_timeout=None, transport=None,
  265. transport_options=None, **kwargs):
  266. """Establish a connection to the message broker.
  267. :keyword hostname: defaults to the :setting:`BROKER_HOST` setting.
  268. :keyword userid: defaults to the :setting:`BROKER_USER` setting.
  269. :keyword password: defaults to the :setting:`BROKER_PASSWORD` setting.
  270. :keyword virtual_host: defaults to the :setting:`BROKER_VHOST` setting.
  271. :keyword port: defaults to the :setting:`BROKER_PORT` setting.
  272. :keyword ssl: defaults to the :setting:`BROKER_USE_SSL` setting.
  273. :keyword insist: defaults to the :setting:`BROKER_INSIST` setting.
  274. :keyword connect_timeout: defaults to the
  275. :setting:`BROKER_CONNECTION_TIMEOUT` setting.
  276. :keyword backend_cls: defaults to the :setting:`BROKER_TRANSPORT`
  277. setting.
  278. :returns :class:`kombu.connection.BrokerConnection`:
  279. """
  280. conf = self.conf
  281. return self.amqp.BrokerConnection(
  282. hostname or conf.BROKER_HOST,
  283. userid or conf.BROKER_USER,
  284. password or conf.BROKER_PASSWORD,
  285. virtual_host or conf.BROKER_VHOST,
  286. port or conf.BROKER_PORT,
  287. transport=transport or conf.BROKER_TRANSPORT,
  288. insist=self.either("BROKER_INSIST", insist),
  289. ssl=self.either("BROKER_USE_SSL", ssl),
  290. connect_timeout=self.either(
  291. "BROKER_CONNECTION_TIMEOUT", connect_timeout),
  292. transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
  293. **transport_options or {}))
  294. @contextmanager
  295. def default_connection(self, connection=None, connect_timeout=None):
  296. """For use within a with-statement to get a connection from the pool
  297. if one is not already provided.
  298. :keyword connection: If not provided, then a connection will be
  299. acquired from the connection pool.
  300. :keyword connect_timeout: *No longer used.*
  301. """
  302. if connection:
  303. yield connection
  304. else:
  305. with self.pool.acquire(block=True) as connection:
  306. yield connection
  307. def with_default_connection(self, fun):
  308. """With any function accepting `connection` and `connect_timeout`
  309. keyword arguments, establishes a default connection if one is
  310. not already passed to it.
  311. Any automatically established connection will be closed after
  312. the function returns.
  313. **Deprecated**
  314. Use ``with app.default_connection(connection)`` instead.
  315. """
  316. @wraps(fun)
  317. def _inner(*args, **kwargs):
  318. connection = kwargs.pop("connection", None)
  319. with self.default_connection(connection) as c:
  320. return fun(*args, **dict(kwargs, connection=c))
  321. return _inner
  322. def prepare_config(self, c):
  323. """Prepare configuration before it is merged with the defaults."""
  324. if self._preconf:
  325. for key, value in self._preconf.iteritems():
  326. setattr(c, key, value)
  327. return find_deprecated_settings(c)
  328. def now(self):
  329. return self.loader.now()
  330. def mail_admins(self, subject, body, fail_silently=False):
  331. """Send an email to the admins in the :setting:`ADMINS` setting."""
  332. if self.conf.ADMINS:
  333. to = [admin_email for _, admin_email in self.conf.ADMINS]
  334. return self.loader.mail_admins(subject, body, fail_silently, to=to,
  335. sender=self.conf.SERVER_EMAIL,
  336. host=self.conf.EMAIL_HOST,
  337. port=self.conf.EMAIL_PORT,
  338. user=self.conf.EMAIL_HOST_USER,
  339. password=self.conf.EMAIL_HOST_PASSWORD,
  340. timeout=self.conf.EMAIL_TIMEOUT,
  341. use_ssl=self.conf.EMAIL_USE_SSL,
  342. use_tls=self.conf.EMAIL_USE_TLS)
  343. def select_queues(self, queues=None):
  344. return self.amqp.queues.select_subset(queues,
  345. self.conf.CELERY_CREATE_MISSING_QUEUES)
  346. def either(self, default_key, *values):
  347. """Fallback to the value of a configuration key if none of the
  348. `*values` are true."""
  349. return first(None, values) or self.conf.get(default_key)
  350. def bugreport(self):
  351. return bugreport(self)
  352. def _get_backend(self):
  353. backend, url = get_backend_by_url(
  354. self.backend_cls or self.conf.CELERY_RESULT_BACKEND,
  355. self.loader)
  356. return backend(app=self, url=url)
  357. def _get_config(self):
  358. return Settings({}, [self.prepare_config(self.loader.conf),
  359. deepcopy(DEFAULTS)])
  360. def _after_fork(self, obj_):
  361. if self._pool:
  362. self._pool.force_close_all()
  363. self._pool = None
  364. @property
  365. def pool(self):
  366. if self._pool is None:
  367. register_after_fork(self, self._after_fork)
  368. self._pool = self.broker_connection().Pool(
  369. limit=self.conf.BROKER_POOL_LIMIT)
  370. return self._pool
  371. @cached_property
  372. def amqp(self):
  373. """Sending/receiving messages. See :class:`~celery.app.amqp.AMQP`."""
  374. return instantiate(self.amqp_cls, app=self)
  375. @cached_property
  376. def backend(self):
  377. """Storing/retrieving task state. See
  378. :class:`~celery.backend.base.BaseBackend`."""
  379. return self._get_backend()
  380. @cached_property
  381. def conf(self):
  382. """Current configuration (dict and attribute access)."""
  383. return self._get_config()
  384. @cached_property
  385. def control(self):
  386. """Controlling worker nodes. See
  387. :class:`~celery.app.control.Control`."""
  388. return instantiate(self.control_cls, app=self)
  389. @cached_property
  390. def events(self):
  391. """Sending/receiving events. See :class:`~celery.events.Events`. """
  392. return instantiate(self.events_cls, app=self)
  393. @cached_property
  394. def loader(self):
  395. """Current loader."""
  396. return get_loader_cls(self.loader_cls)(app=self)
  397. @cached_property
  398. def log(self):
  399. """Logging utilities. See :class:`~celery.log.Logging`."""
  400. return instantiate(self.log_cls, app=self)
  401. @cached_property
  402. def tasks(self):
  403. """Registry of available tasks.
  404. Accessing this attribute will also finalize the app.
  405. """
  406. self.finalize()
  407. return self._tasks