__init__.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. celery.app
  3. ==========
  4. Celery Application.
  5. :copyright: (c) 2009 - 2011 by Ask Solem.
  6. :license: BSD, see LICENSE for more details.
  7. """
  8. import os
  9. import threading
  10. from functools import wraps
  11. from inspect import getargspec
  12. from kombu.utils import cached_property
  13. from celery import registry
  14. from celery.app import base
  15. from celery.utils import instantiate
  16. # Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
  17. # sets this, so it will always contain the last instantiated app,
  18. # and is the default app returned by :func:`app_or_default`.
  19. _tls = threading.local()
  20. _tls.current_app = None
  21. class App(base.BaseApp):
  22. """Celery Application.
  23. :param main: Name of the main module if running as `__main__`.
  24. :keyword loader: The loader class, or the name of the loader class to use.
  25. Default is :class:`celery.loaders.app.AppLoader`.
  26. :keyword backend: The result store backend class, or the name of the
  27. backend class to use. Default is the value of the
  28. :setting:`CELERY_RESULT_BACKEND` setting.
  29. :keyword amqp: AMQP object or class name.
  30. :keyword events: Events object or class name.
  31. :keyword log: Log object or class name.
  32. :keyword control: Control object or class name.
  33. :keyword set_as_current: Make this the global current app.
  34. """
  35. def set_current(self):
  36. _tls.current_app = self
  37. def on_init(self):
  38. if self.set_as_current:
  39. self.set_current()
  40. def create_task_cls(self):
  41. """Creates a base task class using default configuration
  42. taken from this app."""
  43. conf = self.conf
  44. from celery.app.task import BaseTask
  45. class Task(BaseTask):
  46. abstract = True
  47. app = self
  48. backend = self.backend
  49. exchange_type = conf.CELERY_DEFAULT_EXCHANGE_TYPE
  50. delivery_mode = conf.CELERY_DEFAULT_DELIVERY_MODE
  51. send_error_emails = conf.CELERY_SEND_TASK_ERROR_EMAILS
  52. error_whitelist = conf.CELERY_TASK_ERROR_WHITELIST
  53. serializer = conf.CELERY_TASK_SERIALIZER
  54. rate_limit = conf.CELERY_DEFAULT_RATE_LIMIT
  55. track_started = conf.CELERY_TRACK_STARTED
  56. acks_late = conf.CELERY_ACKS_LATE
  57. ignore_result = conf.CELERY_IGNORE_RESULT
  58. store_errors_even_if_ignored = \
  59. conf.CELERY_STORE_ERRORS_EVEN_IF_IGNORED
  60. accept_magic_kwargs = self.accept_magic_kwargs
  61. Task.__doc__ = BaseTask.__doc__
  62. return Task
  63. def Worker(self, **kwargs):
  64. """Create new :class:`~celery.apps.worker.Worker` instance."""
  65. return instantiate("celery.apps.worker.Worker", app=self, **kwargs)
  66. def Beat(self, **kwargs):
  67. """Create new :class:`~celery.apps.beat.Beat` instance."""
  68. return instantiate("celery.apps.beat.Beat", app=self, **kwargs)
  69. def TaskSet(self, *args, **kwargs):
  70. """Create new :class:`~celery.task.sets.TaskSet`."""
  71. from celery.task.sets import TaskSet
  72. kwargs["app"] = self
  73. return TaskSet(*args, **kwargs)
  74. def worker_main(self, argv=None):
  75. """Run :program:`celeryd` using `argv`. Uses :data:`sys.argv`
  76. if `argv` is not specified."""
  77. from celery.bin.celeryd import WorkerCommand
  78. return WorkerCommand(app=self).execute_from_commandline(argv)
  79. def task(self, *args, **options):
  80. """Decorator to create a task class out of any callable.
  81. .. admonition:: Examples
  82. .. code-block:: python
  83. @task()
  84. def refresh_feed(url):
  85. return Feed.objects.get(url=url).refresh()
  86. With setting extra options and using retry.
  87. .. code-block:: python
  88. @task(exchange="feeds")
  89. def refresh_feed(url, **kwargs):
  90. try:
  91. return Feed.objects.get(url=url).refresh()
  92. except socket.error, exc:
  93. refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
  94. Calling the resulting task:
  95. >>> refresh_feed("http://example.com/rss") # Regular
  96. <Feed: http://example.com/rss>
  97. >>> refresh_feed.delay("http://example.com/rss") # Async
  98. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  99. """
  100. def inner_create_task_cls(**options):
  101. def _create_task_cls(fun):
  102. options["app"] = self
  103. options.setdefault("accept_magic_kwargs", False)
  104. base = options.pop("base", None) or self.Task
  105. @wraps(fun, assigned=("__module__", "__name__"))
  106. def run(self, *args, **kwargs):
  107. return fun(*args, **kwargs)
  108. # Save the argspec for this task so we can recognize
  109. # which default task kwargs we're going to pass to it later.
  110. # (this happens in celery.utils.fun_takes_kwargs)
  111. run.argspec = getargspec(fun)
  112. cls_dict = dict(options, run=run,
  113. __module__=fun.__module__,
  114. __doc__=fun.__doc__)
  115. T = type(fun.__name__, (base, ), cls_dict)()
  116. return registry.tasks[T.name] # global instance.
  117. return _create_task_cls
  118. if len(args) == 1 and callable(args[0]):
  119. return inner_create_task_cls(**options)(*args)
  120. return inner_create_task_cls(**options)
  121. @cached_property
  122. def Task(self):
  123. """Default Task base class for this application."""
  124. return self.create_task_cls()
  125. def __repr__(self):
  126. return "<Celery: %s:0x%x>" % (self.main or "__main__", id(self), )
  127. def __reduce__(self):
  128. # Reduce only pickles the configuration changes,
  129. # so the default configuration doesn't have to be passed
  130. # between processes.
  131. return (_unpickle_app, (self.__class__,
  132. self.main,
  133. self.conf.changes,
  134. self.loader_cls,
  135. self.backend_cls,
  136. self.amqp_cls,
  137. self.events_cls,
  138. self.log_cls,
  139. self.control_cls,
  140. self.accept_magic_kwargs))
  141. def _unpickle_app(cls, main, changes, loader, backend, amqp,
  142. events, log, control, accept_magic_kwargs):
  143. app = cls(main, loader=loader, backend=backend, amqp=amqp,
  144. events=events, log=log, control=control,
  145. set_as_current=False,
  146. accept_magic_kwargs=accept_magic_kwargs)
  147. app.conf.update(changes)
  148. return app
  149. #: The "default" loader is the default loader used by old applications.
  150. default_loader = os.environ.get("CELERY_LOADER") or "default"
  151. #: Global fallback app instance.
  152. default_app = App("default", loader=default_loader,
  153. set_as_current=False, accept_magic_kwargs=True)
  154. def current_app():
  155. return getattr(_tls, "current_app", None) or default_app
  156. def _app_or_default(app=None):
  157. """Returns the app provided or the default app if none.
  158. The environment variable :envvar:`CELERY_TRACE_APP` is used to
  159. trace app leaks. When enabled an exception is raised if there
  160. is no active app.
  161. """
  162. if app is None:
  163. return getattr(_tls, "current_app", None) or default_app
  164. return app
  165. def _app_or_default_trace(app=None): # pragma: no cover
  166. from traceback import print_stack
  167. from multiprocessing import current_process
  168. if app is None:
  169. if getattr(_tls, "current_app", None):
  170. print("-- RETURNING TO CURRENT APP --")
  171. print_stack()
  172. return _tls.current_app
  173. if current_process()._name == "MainProcess":
  174. raise Exception("DEFAULT APP")
  175. print("-- RETURNING TO DEFAULT APP --")
  176. print_stack()
  177. return default_app
  178. return app
  179. def enable_trace():
  180. global app_or_default
  181. app_or_default = _app_or_default_trace
  182. def disable_trace():
  183. global app_or_default
  184. app_or_default = _app_or_default
  185. app_or_default = _app_or_default
  186. if os.environ.get("CELERY_TRACE_APP"): # pragma: no cover
  187. enable_trace()