__init__.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app
  4. ~~~~~~~~~~
  5. Celery Application.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. from ..local import PromiseProxy
  12. from ..utils import cached_property
  13. from ..utils.imports import instantiate
  14. from . import annotations
  15. from . import base
  16. from .state import _tls
  17. from .state import current_task # noqa
  18. class AppPickler(object):
  19. """Default application pickler/unpickler."""
  20. def __call__(self, cls, *args):
  21. kwargs = self.build_kwargs(*args)
  22. app = self.construct(cls, **kwargs)
  23. self.prepare(app, **kwargs)
  24. return app
  25. def prepare(self, app, **kwargs):
  26. app.conf.update(kwargs["changes"])
  27. def build_kwargs(self, *args):
  28. return self.build_standard_kwargs(*args)
  29. def build_standard_kwargs(self, main, changes, loader, backend, amqp,
  30. events, log, control, accept_magic_kwargs):
  31. return dict(main=main, loader=loader, backend=backend, amqp=amqp,
  32. changes=changes, events=events, log=log, control=control,
  33. set_as_current=False,
  34. accept_magic_kwargs=accept_magic_kwargs)
  35. def construct(self, cls, **kwargs):
  36. return cls(**kwargs)
  37. def _unpickle_app(cls, pickler, *args):
  38. return pickler()(cls, *args)
  39. class App(base.BaseApp):
  40. """Celery Application.
  41. :param main: Name of the main module if running as `__main__`.
  42. :keyword loader: The loader class, or the name of the loader class to use.
  43. Default is :class:`celery.loaders.app.AppLoader`.
  44. :keyword backend: The result store backend class, or the name of the
  45. backend class to use. Default is the value of the
  46. :setting:`CELERY_RESULT_BACKEND` setting.
  47. :keyword amqp: AMQP object or class name.
  48. :keyword events: Events object or class name.
  49. :keyword log: Log object or class name.
  50. :keyword control: Control object or class name.
  51. :keyword set_as_current: Make this the global current app.
  52. """
  53. Pickler = AppPickler
  54. def set_current(self):
  55. """Make this the current app for this thread."""
  56. _tls.current_app = self
  57. def on_init(self):
  58. if self.set_as_current:
  59. self.set_current()
  60. def create_task_cls(self):
  61. """Creates a base task class using default configuration
  62. taken from this app."""
  63. from .task import BaseTask
  64. class Task(BaseTask):
  65. app = self
  66. abstract = True
  67. Task.__doc__ = BaseTask.__doc__
  68. Task.bind(self)
  69. return Task
  70. def Worker(self, **kwargs):
  71. """Create new :class:`~celery.apps.worker.Worker` instance."""
  72. return instantiate("celery.apps.worker:Worker", app=self, **kwargs)
  73. def WorkController(self, **kwargs):
  74. return instantiate("celery.worker:WorkController", app=self, **kwargs)
  75. def Beat(self, **kwargs):
  76. """Create new :class:`~celery.apps.beat.Beat` instance."""
  77. return instantiate("celery.apps.beat:Beat", app=self, **kwargs)
  78. def TaskSet(self, *args, **kwargs):
  79. """Create new :class:`~celery.task.sets.TaskSet`."""
  80. return instantiate("celery.task.sets:TaskSet",
  81. app=self, *args, **kwargs)
  82. def celery_main(self, argv=None):
  83. """Run :program:`celery` using `argv`. Uses :data:`sys.argv`
  84. if `argv` is not specified."""
  85. return instantiate("celery.bin.celery:CeleryCommand", app=self) \
  86. .execute_from_commandline(argv)
  87. def worker_main(self, argv=None):
  88. """Run :program:`celeryd` using `argv`. Uses :data:`sys.argv`
  89. if `argv` is not specified."""
  90. return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
  91. .execute_from_commandline(argv)
  92. def task(self, *args, **options):
  93. """Decorator to create a task class out of any callable.
  94. **Examples:**
  95. .. code-block:: python
  96. @task()
  97. def refresh_feed(url):
  98. return Feed.objects.get(url=url).refresh()
  99. with setting extra options and using retry.
  100. .. code-block:: python
  101. from celery.task import current
  102. @task(exchange="feeds")
  103. def refresh_feed(url):
  104. try:
  105. return Feed.objects.get(url=url).refresh()
  106. except socket.error, exc:
  107. current.retry(exc=exc)
  108. Calling the resulting task::
  109. >>> refresh_feed("http://example.com/rss") # Regular
  110. <Feed: http://example.com/rss>
  111. >>> refresh_feed.delay("http://example.com/rss") # Async
  112. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  113. .. admonition:: App Binding
  114. For custom apps the task decorator returns proxy
  115. objects, so that the act of creating the task is not performed
  116. until the task is used or the task registry is accessed.
  117. If you are depending on binding to be deferred, then you must
  118. not access any attributes on the returned object until the
  119. application is fully set up (finalized).
  120. """
  121. def inner_create_task_cls(**options):
  122. def _create_task_cls(fun):
  123. if self.accept_magic_kwargs: # compat mode
  124. return self._task_from_fun(fun, **options)
  125. # return a proxy object that is only evaluated when first used
  126. promise = PromiseProxy(self._task_from_fun, (fun, ), options)
  127. self._pending.append(promise)
  128. return promise
  129. return _create_task_cls
  130. if len(args) == 1 and callable(args[0]):
  131. return inner_create_task_cls(**options)(*args)
  132. return inner_create_task_cls(**options)
  133. def _task_from_fun(self, fun, **options):
  134. base = options.pop("base", None) or self.Task
  135. T = type(fun.__name__, (base, ), dict({
  136. "app": self,
  137. "accept_magic_kwargs": False,
  138. "run": staticmethod(fun),
  139. "__doc__": fun.__doc__,
  140. "__module__": fun.__module__}, **options))()
  141. return self._tasks[T.name] # return global instance.
  142. def annotate_task(self, task):
  143. if self.annotations:
  144. match = annotations._first_match(self.annotations, task)
  145. for attr, value in (match or {}).iteritems():
  146. setattr(task, attr, value)
  147. match_any = annotations._first_match_any(self.annotations)
  148. for attr, value in (match_any or {}).iteritems():
  149. setattr(task, attr, value)
  150. @cached_property
  151. def Task(self):
  152. """Default Task base class for this application."""
  153. return self.create_task_cls()
  154. @cached_property
  155. def annotations(self):
  156. return annotations.prepare(self.conf.CELERY_ANNOTATIONS)
  157. def __repr__(self):
  158. return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
  159. def __reduce__(self):
  160. # Reduce only pickles the configuration changes,
  161. # so the default configuration doesn't have to be passed
  162. # between processes.
  163. return (_unpickle_app, (self.__class__, self.Pickler)
  164. + self.__reduce_args__())
  165. def __reduce_args__(self):
  166. return (self.main,
  167. self.conf.changes,
  168. self.loader_cls,
  169. self.backend_cls,
  170. self.amqp_cls,
  171. self.events_cls,
  172. self.log_cls,
  173. self.control_cls,
  174. self.accept_magic_kwargs)
  175. #: The "default" loader is the default loader used by old applications.
  176. default_loader = os.environ.get("CELERY_LOADER") or "default"
  177. #: Global fallback app instance.
  178. default_app = App("default", loader=default_loader,
  179. set_as_current=False,
  180. accept_magic_kwargs=True)
  181. def current_app():
  182. return getattr(_tls, "current_app", None) or default_app
  183. def _app_or_default(app=None):
  184. """Returns the app provided or the default app if none.
  185. The environment variable :envvar:`CELERY_TRACE_APP` is used to
  186. trace app leaks. When enabled an exception is raised if there
  187. is no active app.
  188. """
  189. if app is None:
  190. return getattr(_tls, "current_app", None) or default_app
  191. return app
  192. def _app_or_default_trace(app=None): # pragma: no cover
  193. from traceback import print_stack
  194. from multiprocessing import current_process
  195. if app is None:
  196. if getattr(_tls, "current_app", None):
  197. print("-- RETURNING TO CURRENT APP --") # noqa+
  198. print_stack()
  199. return _tls.current_app
  200. if current_process()._name == "MainProcess":
  201. raise Exception("DEFAULT APP")
  202. print("-- RETURNING TO DEFAULT APP --") # noqa+
  203. print_stack()
  204. return default_app
  205. return app
  206. def enable_trace():
  207. global app_or_default
  208. app_or_default = _app_or_default_trace
  209. def disable_trace():
  210. global app_or_default
  211. app_or_default = _app_or_default
  212. app_or_default = _app_or_default
  213. if os.environ.get("CELERY_TRACE_APP"): # pragma: no cover
  214. enable_trace()