__init__.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app
  4. ~~~~~~~~~~
  5. Celery Application.
  6. :copyright: (c) 2009 - 2012 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. import threading
  12. from ..utils import cached_property, instantiate
  13. from . import annotations
  14. from . import base
  15. class _TLS(threading.local):
  16. #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  17. #: sets this, so it will always contain the last instantiated app,
  18. #: and is the default app returned by :func:`app_or_default`.
  19. current_app = None
  20. #: The currently executing task.
  21. current_task = None
  22. _tls = _TLS()
  23. class AppPickler(object):
  24. """Default application pickler/unpickler."""
  25. def __call__(self, cls, *args):
  26. kwargs = self.build_kwargs(*args)
  27. app = self.construct(cls, **kwargs)
  28. self.prepare(app, **kwargs)
  29. return app
  30. def prepare(self, app, **kwargs):
  31. app.conf.update(kwargs["changes"])
  32. def build_kwargs(self, *args):
  33. return self.build_standard_kwargs(*args)
  34. def build_standard_kwargs(self, main, changes, loader, backend, amqp,
  35. events, log, control, accept_magic_kwargs):
  36. return dict(main=main, loader=loader, backend=backend, amqp=amqp,
  37. changes=changes, events=events, log=log, control=control,
  38. set_as_current=False,
  39. accept_magic_kwargs=accept_magic_kwargs)
  40. def construct(self, cls, **kwargs):
  41. return cls(**kwargs)
  42. def _unpickle_app(cls, pickler, *args):
  43. return pickler()(cls, *args)
  44. class App(base.BaseApp):
  45. """Celery Application.
  46. :param main: Name of the main module if running as `__main__`.
  47. :keyword loader: The loader class, or the name of the loader class to use.
  48. Default is :class:`celery.loaders.app.AppLoader`.
  49. :keyword backend: The result store backend class, or the name of the
  50. backend class to use. Default is the value of the
  51. :setting:`CELERY_RESULT_BACKEND` setting.
  52. :keyword amqp: AMQP object or class name.
  53. :keyword events: Events object or class name.
  54. :keyword log: Log object or class name.
  55. :keyword control: Control object or class name.
  56. :keyword set_as_current: Make this the global current app.
  57. """
  58. Pickler = AppPickler
  59. def set_current(self):
  60. """Make this the current app for this thread."""
  61. _tls.current_app = self
  62. def on_init(self):
  63. if self.set_as_current:
  64. self.set_current()
  65. def create_task_cls(self):
  66. """Creates a base task class using default configuration
  67. taken from this app."""
  68. from .task import BaseTask
  69. class Task(BaseTask):
  70. app = self
  71. abstract = True
  72. Task.__doc__ = BaseTask.__doc__
  73. Task.bind(self)
  74. return Task
  75. def Worker(self, **kwargs):
  76. """Create new :class:`~celery.apps.worker.Worker` instance."""
  77. return instantiate("celery.apps.worker:Worker", app=self, **kwargs)
  78. def WorkController(self, **kwargs):
  79. return instantiate("celery.worker:WorkController", app=self, **kwargs)
  80. def Beat(self, **kwargs):
  81. """Create new :class:`~celery.apps.beat.Beat` instance."""
  82. return instantiate("celery.apps.beat:Beat", app=self, **kwargs)
  83. def TaskSet(self, *args, **kwargs):
  84. """Create new :class:`~celery.task.sets.TaskSet`."""
  85. return instantiate("celery.task.sets:TaskSet",
  86. app=self, *args, **kwargs)
  87. def worker_main(self, argv=None):
  88. """Run :program:`celeryd` using `argv`. Uses :data:`sys.argv`
  89. if `argv` is not specified."""
  90. return instantiate("celery.bin.celeryd:WorkerCommand", app=self) \
  91. .execute_from_commandline(argv)
  92. def task(self, *args, **options):
  93. """Decorator to create a task class out of any callable.
  94. **Examples:**
  95. .. code-block:: python
  96. @task()
  97. def refresh_feed(url):
  98. return Feed.objects.get(url=url).refresh()
  99. with setting extra options and using retry.
  100. .. code-block:: python
  101. from celery.task import current
  102. @task(exchange="feeds")
  103. def refresh_feed(url):
  104. try:
  105. return Feed.objects.get(url=url).refresh()
  106. except socket.error, exc:
  107. current.retry(exc=exc)
  108. Calling the resulting task::
  109. >>> refresh_feed("http://example.com/rss") # Regular
  110. <Feed: http://example.com/rss>
  111. >>> refresh_feed.delay("http://example.com/rss") # Async
  112. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  113. """
  114. def inner_create_task_cls(**options):
  115. def _create_task_cls(fun):
  116. base = options.pop("base", None) or self.Task
  117. T = type(fun.__name__, (base, ), dict({
  118. "app": self,
  119. "accept_magic_kwargs": False,
  120. "run": staticmethod(fun),
  121. "__doc__": fun.__doc__,
  122. "__module__": fun.__module__}, **options))()
  123. return self._tasks[T.name] # global instance.
  124. return _create_task_cls
  125. if len(args) == 1 and callable(args[0]):
  126. return inner_create_task_cls(**options)(*args)
  127. return inner_create_task_cls(**options)
  128. def annotate_task(self, task):
  129. if self.annotations:
  130. match = annotations._first_match(self.annotations, task)
  131. for attr, value in (match or {}).iteritems():
  132. setattr(task, attr, value)
  133. match_any = annotations._first_match_any(self.annotations)
  134. for attr, value in (match_any or {}).iteritems():
  135. setattr(task, attr, value)
  136. @cached_property
  137. def Task(self):
  138. """Default Task base class for this application."""
  139. return self.create_task_cls()
  140. @cached_property
  141. def annotations(self):
  142. return annotations.prepare(self.conf.CELERY_ANNOTATIONS)
  143. def __repr__(self):
  144. return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
  145. def __reduce__(self):
  146. # Reduce only pickles the configuration changes,
  147. # so the default configuration doesn't have to be passed
  148. # between processes.
  149. return (_unpickle_app, (self.__class__, self.Pickler)
  150. + self.__reduce_args__())
  151. def __reduce_args__(self):
  152. return (self.main,
  153. self.conf.changes,
  154. self.loader_cls,
  155. self.backend_cls,
  156. self.amqp_cls,
  157. self.events_cls,
  158. self.log_cls,
  159. self.control_cls,
  160. self.accept_magic_kwargs)
  161. #: The "default" loader is the default loader used by old applications.
  162. default_loader = os.environ.get("CELERY_LOADER") or "default"
  163. #: Global fallback app instance.
  164. default_app = App("default", loader=default_loader,
  165. set_as_current=False,
  166. accept_magic_kwargs=True)
  167. def current_app():
  168. return getattr(_tls, "current_app", None) or default_app
  169. def current_task():
  170. return getattr(_tls, "current_task", None)
  171. def _app_or_default(app=None):
  172. """Returns the app provided or the default app if none.
  173. The environment variable :envvar:`CELERY_TRACE_APP` is used to
  174. trace app leaks. When enabled an exception is raised if there
  175. is no active app.
  176. """
  177. if app is None:
  178. return getattr(_tls, "current_app", None) or default_app
  179. return app
  180. def _app_or_default_trace(app=None): # pragma: no cover
  181. from traceback import print_stack
  182. from multiprocessing import current_process
  183. if app is None:
  184. if getattr(_tls, "current_app", None):
  185. print("-- RETURNING TO CURRENT APP --") # noqa+
  186. print_stack()
  187. return _tls.current_app
  188. if current_process()._name == "MainProcess":
  189. raise Exception("DEFAULT APP")
  190. print("-- RETURNING TO DEFAULT APP --") # noqa+
  191. print_stack()
  192. return default_app
  193. return app
  194. def enable_trace():
  195. global app_or_default
  196. app_or_default = _app_or_default_trace
  197. def disable_trace():
  198. global app_or_default
  199. app_or_default = _app_or_default
  200. app_or_default = _app_or_default
  201. if os.environ.get("CELERY_TRACE_APP"): # pragma: no cover
  202. enable_trace()