__init__.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app
  4. ~~~~~~~~~~
  5. Celery Application.
  6. :copyright: (c) 2009 - 2011 by Ask Solem.
  7. :license: BSD, see LICENSE for more details.
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. import threading
  12. from functools import wraps
  13. from inspect import getargspec
  14. from .. import registry
  15. from ..utils import cached_property, instantiate
  16. from . import base
  17. # Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  18. # sets this, so it will always contain the last instantiated app,
  19. # and is the default app returned by :func:`app_or_default`.
  20. _tls = threading.local()
  21. _tls.current_app = None
  22. class AppPickler(object):
  23. def __call__(self, cls, *args):
  24. kwargs = self.build_kwargs(*args)
  25. app = self.construct(cls, **kwargs)
  26. self.prepare(app, **kwargs)
  27. return app
  28. def prepare(self, app, **kwargs):
  29. app.conf.update(kwargs["changes"])
  30. def build_kwargs(self, *args):
  31. return self.build_standard_kwargs(*args)
  32. def build_standard_kwargs(self, main, changes, loader, backend, amqp,
  33. events, log, control, accept_magic_kwargs):
  34. return dict(main=main, loader=loader, backend=backend, amqp=amqp,
  35. changes=changes, events=events, log=log, control=control,
  36. set_as_current=False,
  37. accept_magic_kwargs=accept_magic_kwargs)
  38. def construct(self, cls, **kwargs):
  39. return cls(**kwargs)
  40. def _unpickle_app(cls, pickler, *args):
  41. return pickler()(cls, *args)
  42. class App(base.BaseApp):
  43. """Celery Application.
  44. :param main: Name of the main module if running as `__main__`.
  45. :keyword loader: The loader class, or the name of the loader class to use.
  46. Default is :class:`celery.loaders.app.AppLoader`.
  47. :keyword backend: The result store backend class, or the name of the
  48. backend class to use. Default is the value of the
  49. :setting:`CELERY_RESULT_BACKEND` setting.
  50. :keyword amqp: AMQP object or class name.
  51. :keyword events: Events object or class name.
  52. :keyword log: Log object or class name.
  53. :keyword control: Control object or class name.
  54. :keyword set_as_current: Make this the global current app.
  55. """
  56. Pickler = AppPickler
  57. def set_current(self):
  58. """Make this the current app for this thread."""
  59. _tls.current_app = self
  60. def on_init(self):
  61. if self.set_as_current:
  62. self.set_current()
  63. def create_task_cls(self):
  64. """Creates a base task class using default configuration
  65. taken from this app."""
  66. conf = self.conf
  67. from .task import BaseTask
  68. class Task(BaseTask):
  69. abstract = True
  70. app = self
  71. backend = self.backend
  72. exchange_type = conf.CELERY_DEFAULT_EXCHANGE_TYPE
  73. delivery_mode = conf.CELERY_DEFAULT_DELIVERY_MODE
  74. send_error_emails = conf.CELERY_SEND_TASK_ERROR_EMAILS
  75. error_whitelist = conf.CELERY_TASK_ERROR_WHITELIST
  76. serializer = conf.CELERY_TASK_SERIALIZER
  77. rate_limit = conf.CELERY_DEFAULT_RATE_LIMIT
  78. track_started = conf.CELERY_TRACK_STARTED
  79. acks_late = conf.CELERY_ACKS_LATE
  80. ignore_result = conf.CELERY_IGNORE_RESULT
  81. store_errors_even_if_ignored = \
  82. conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
  83. accept_magic_kwargs = self.accept_magic_kwargs
  84. Task.__doc__ = BaseTask.__doc__
  85. return Task
  86. def Worker(self, **kwargs):
  87. """Create new :class:`~celery.apps.worker.Worker` instance."""
  88. return instantiate("celery.apps.worker.Worker", app=self, **kwargs)
  89. def Beat(self, **kwargs):
  90. """Create new :class:`~celery.apps.beat.Beat` instance."""
  91. return instantiate("celery.apps.beat.Beat", app=self, **kwargs)
  92. def TaskSet(self, *args, **kwargs):
  93. """Create new :class:`~celery.task.sets.TaskSet`."""
  94. from ..task.sets import TaskSet
  95. kwargs["app"] = self
  96. return TaskSet(*args, **kwargs)
  97. def worker_main(self, argv=None):
  98. """Run :program:`celeryd` using `argv`. Uses :data:`sys.argv`
  99. if `argv` is not specified."""
  100. from ..bin.celeryd import WorkerCommand
  101. return WorkerCommand(app=self).execute_from_commandline(argv)
  102. def task(self, *args, **options):
  103. """Decorator to create a task class out of any callable.
  104. .. admonition:: Examples
  105. .. code-block:: python
  106. @task()
  107. def refresh_feed(url):
  108. return Feed.objects.get(url=url).refresh()
  109. With setting extra options and using retry.
  110. .. code-block:: python
  111. @task(exchange="feeds")
  112. def refresh_feed(url, **kwargs):
  113. try:
  114. return Feed.objects.get(url=url).refresh()
  115. except socket.error, exc:
  116. refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
  117. Calling the resulting task:
  118. >>> refresh_feed("http://example.com/rss") # Regular
  119. <Feed: http://example.com/rss>
  120. >>> refresh_feed.delay("http://example.com/rss") # Async
  121. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  122. """
  123. def inner_create_task_cls(**options):
  124. def _create_task_cls(fun):
  125. options["app"] = self
  126. options.setdefault("accept_magic_kwargs", False)
  127. base = options.pop("base", None) or self.Task
  128. @wraps(fun, assigned=("__module__", "__name__"))
  129. def run(self, *args, **kwargs):
  130. return fun(*args, **kwargs)
  131. # Save the argspec for this task so we can recognize
  132. # which default task kwargs we're going to pass to it later.
  133. # (this happens in celery.utils.fun_takes_kwargs)
  134. run.argspec = getargspec(fun)
  135. cls_dict = dict(options, run=run,
  136. __module__=fun.__module__,
  137. __doc__=fun.__doc__)
  138. T = type(fun.__name__, (base, ), cls_dict)()
  139. return registry.tasks[T.name] # global instance.
  140. return _create_task_cls
  141. if len(args) == 1 and callable(args[0]):
  142. return inner_create_task_cls(**options)(*args)
  143. return inner_create_task_cls(**options)
  144. @cached_property
  145. def Task(self):
  146. """Default Task base class for this application."""
  147. return self.create_task_cls()
  148. def __repr__(self):
  149. return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
  150. def __reduce__(self):
  151. # Reduce only pickles the configuration changes,
  152. # so the default configuration doesn't have to be passed
  153. # between processes.
  154. return (_unpickle_app, (self.__class__, self.Pickler)
  155. + self.__reduce_args__())
  156. def __reduce_args__(self):
  157. return (self.main,
  158. self.conf.changes,
  159. self.loader_cls,
  160. self.backend_cls,
  161. self.amqp_cls,
  162. self.events_cls,
  163. self.log_cls,
  164. self.control_cls,
  165. self.accept_magic_kwargs)
  166. #: The "default" loader is the default loader used by old applications.
  167. default_loader = os.environ.get("CELERY_LOADER") or "default"
  168. #: Global fallback app instance.
  169. default_app = App("default", loader=default_loader,
  170. set_as_current=False, accept_magic_kwargs=True)
  171. def current_app():
  172. return getattr(_tls, "current_app", None) or default_app
  173. def _app_or_default(app=None):
  174. """Returns the app provided or the default app if none.
  175. The environment variable :envvar:`CELERY_TRACE_APP` is used to
  176. trace app leaks. When enabled an exception is raised if there
  177. is no active app.
  178. """
  179. if app is None:
  180. return getattr(_tls, "current_app", None) or default_app
  181. return app
  182. def _app_or_default_trace(app=None): # pragma: no cover
  183. from traceback import print_stack
  184. from multiprocessing import current_process
  185. if app is None:
  186. if getattr(_tls, "current_app", None):
  187. print("-- RETURNING TO CURRENT APP --") # noqa+
  188. print_stack()
  189. return _tls.current_app
  190. if current_process()._name == "MainProcess":
  191. raise Exception("DEFAULT APP")
  192. print("-- RETURNING TO DEFAULT APP --") # noqa+
  193. print_stack()
  194. return default_app
  195. return app
  196. def enable_trace():
  197. global app_or_default
  198. app_or_default = _app_or_default_trace
  199. def disable_trace():
  200. global app_or_default
  201. app_or_default = _app_or_default
  202. app_or_default = _app_or_default
  203. if os.environ.get("CELERY_TRACE_APP"): # pragma: no cover
  204. enable_trace()