__init__.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import os
  2. from inspect import getargspec
  3. from celery import registry
  4. from celery.app import base
  5. from celery.utils.functional import wraps
  6. _current_app = None
  7. class App(base.BaseApp):
  8. """Celery Application.
  9. Inherits from :class:`celery.app.base.BaseApp`.
  10. :keyword loader: The loader class, or the name of the loader class to use.
  11. Default is :class:`celery.loaders.app.AppLoader`.
  12. :keyword backend: The result store backend class, or the name of the
  13. backend class to use. Default is the value of the
  14. ``CELERY_RESULT_BACKEND`` setting.
  15. .. attribute:: amqp
  16. Sending/receiving messages.
  17. See :class:`celery.app.amqp.AMQP`.
  18. .. attribute:: backend
  19. Storing/retreiving task state.
  20. See :class:`celery.backend.base.BaseBackend`.
  21. .. attribute:: conf
  22. Current configuration. Supports both the dict interface and
  23. attribute access.
  24. .. attribute:: control
  25. Controlling worker nodes.
  26. See :class:`celery.task.control.Control`.
  27. .. attribute:: log
  28. Logging.
  29. See :class:`celery.log.Logging`.
  30. """
  31. def on_init(self):
  32. if self.set_as_current:
  33. global _current_app
  34. _current_app = self
  35. def create_task_cls(self):
  36. """Creates a base task class using default configuration
  37. taken from this app."""
  38. from celery.task.base import create_task_cls
  39. return create_task_cls(app=self)
  40. def Worker(self, **kwargs):
  41. """Create new :class:`celery.apps.worker.Worker` instance."""
  42. from celery.apps.worker import Worker
  43. return Worker(app=self, **kwargs)
  44. def Beat(self, **kwargs):
  45. """Create new :class:`celery.apps.beat.Beat` instance."""
  46. from celery.apps.beat import Beat
  47. return Beat(app=self, **kwargs)
  48. def TaskSet(self, *args, **kwargs):
  49. """Create new :class:`celery.task.sets.TaskSet`."""
  50. from celery.task.sets import TaskSet
  51. kwargs["app"] = self
  52. return TaskSet(*args, **kwargs)
  53. def worker_main(self, argv=None):
  54. from celery.bin.celeryd import WorkerCommand
  55. return WorkerCommand(app=self).execute_from_commandline(argv)
  56. def task(self, *args, **options):
  57. """Decorator to create a task class out of any callable.
  58. Examples:
  59. .. code-block:: python
  60. @task()
  61. def refresh_feed(url):
  62. return Feed.objects.get(url=url).refresh()
  63. With setting extra options and using retry.
  64. .. code-block:: python
  65. @task(exchange="feeds")
  66. def refresh_feed(url, **kwargs):
  67. try:
  68. return Feed.objects.get(url=url).refresh()
  69. except socket.error, exc:
  70. refresh_feed.retry(args=[url], kwargs=kwargs, exc=exc)
  71. Calling the resulting task:
  72. >>> refresh_feed("http://example.com/rss") # Regular
  73. <Feed: http://example.com/rss>
  74. >>> refresh_feed.delay("http://example.com/rss") # Async
  75. <AsyncResult: 8998d0f4-da0b-4669-ba03-d5ab5ac6ad5d>
  76. """
  77. def inner_create_task_cls(**options):
  78. def _create_task_cls(fun):
  79. base = options.pop("base", None) or self.create_task_cls()
  80. @wraps(fun, assigned=("__module__", "__name__"))
  81. def run(self, *args, **kwargs):
  82. return fun(*args, **kwargs)
  83. # Save the argspec for this task so we can recognize
  84. # which default task kwargs we're going to pass to it later.
  85. # (this happens in celery.utils.fun_takes_kwargs)
  86. run.argspec = getargspec(fun)
  87. cls_dict = dict(options, run=run,
  88. __module__=fun.__module__,
  89. __doc__=fun.__doc__)
  90. T = type(fun.__name__, (base, ), cls_dict)()
  91. return registry.tasks[T.name] # global instance.
  92. return _create_task_cls
  93. if len(args) == 1 and callable(args[0]):
  94. return inner_create_task_cls()(*args)
  95. return inner_create_task_cls(**options)
  96. # The "default" loader is the default loader used by old applications.
  97. default_loader = os.environ.get("CELERY_LOADER") or "default"
  98. default_app = App(loader=default_loader, set_as_current=False)
  99. if os.environ.get("CELERY_TRACE_APP"):
  100. def app_or_default(app=None):
  101. from traceback import print_stack
  102. from multiprocessing import current_process
  103. global _current_app
  104. if app is None:
  105. if _current_app:
  106. print("-- RETURNING TO CURRENT APP --")
  107. print_stack()
  108. return _current_app
  109. if current_process()._name == "MainProcess":
  110. raise Exception("DEFAULT APP")
  111. print("-- RETURNING TO DEFAULT APP --")
  112. print_stack()
  113. return default_app
  114. return app
  115. else:
  116. def app_or_default(app=None):
  117. """Returns the app provided or the default app if none.
  118. If the environment variable :envvar:`CELERY_TRACE_APP` is set,
  119. any time there is no active app and exception is raised. This
  120. is used to trace app leaks (when someone forgets to pass
  121. along the app instance).
  122. """
  123. global _current_app
  124. if app is None:
  125. return _current_app or default_app
  126. return app