|
@@ -6,22 +6,70 @@ from celery import registry
|
|
|
from celery.app import base
|
|
|
from celery.utils.functional import wraps
|
|
|
|
|
|
+_current_app = None
|
|
|
+
|
|
|
|
|
|
class App(base.BaseApp):
|
|
|
+ """Celery Application.
|
|
|
+
|
|
|
+ Inherits from :class:`celery.app.base.BaseApp`.
|
|
|
+
|
|
|
+ :keyword loader: The loader class, or the name of the loader class to use.
|
|
|
+ Default is :class:`celery.loaders.app.AppLoader`.
|
|
|
+ :keyword backend: The result store backend class, or the name of the
|
|
|
+ backend class to use. Default is the value of the
|
|
|
+ ``CELERY_RESULT_BACKEND`` setting.
|
|
|
+
|
|
|
+ .. attribute:: amqp
|
|
|
+
|
|
|
+ Sending/receiving messages.
|
|
|
+ See :class:`celery.app.amqp.AMQP`.
|
|
|
+
|
|
|
+ .. attribute:: backend
|
|
|
+
|
|
|
+ Storing/retreiving task state.
|
|
|
+ See :class:`celery.backend.base.BaseBackend`.
|
|
|
+
|
|
|
+ .. attribute:: conf
|
|
|
+
|
|
|
+ Current configuration. Supports both the dict interface and
|
|
|
+ attribute access.
|
|
|
+
|
|
|
+ .. attribute:: control
|
|
|
+
|
|
|
+ Controlling worker nodes.
|
|
|
+ See :class:`celery.task.control.Control`.
|
|
|
+
|
|
|
+ .. attribute:: log
|
|
|
+
|
|
|
+ Logging.
|
|
|
+ See :class:`celery.log.Logging`.
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ def on_init(self):
|
|
|
+ if self.set_as_current:
|
|
|
+ global _current_app
|
|
|
+ _current_app = self
|
|
|
|
|
|
def create_task_cls(self):
|
|
|
+ """Creates a base task class using default configuration
|
|
|
+ taken from this app."""
|
|
|
from celery.task.base import create_task_cls
|
|
|
return create_task_cls(app=self)
|
|
|
|
|
|
def Worker(self, **kwargs):
|
|
|
+ """Create new :class:`celery.apps.worker.Worker` instance."""
|
|
|
from celery.apps.worker import Worker
|
|
|
return Worker(app=self, **kwargs)
|
|
|
|
|
|
def Beat(self, **kwargs):
|
|
|
+ """Create new :class:`celery.apps.beat.Beat` instance."""
|
|
|
from celery.apps.beat import Beat
|
|
|
return Beat(app=self, **kwargs)
|
|
|
|
|
|
def TaskSet(self, *args, **kwargs):
|
|
|
+ """Create new :class:`celery.task.sets.TaskSet`."""
|
|
|
from celery.task.sets import TaskSet
|
|
|
kwargs["app"] = self
|
|
|
return TaskSet(*args, **kwargs)
|
|
@@ -85,21 +133,36 @@ class App(base.BaseApp):
|
|
|
|
|
|
# The "default" loader is the default loader used by old applications.
|
|
|
default_loader = os.environ.get("CELERY_LOADER") or "default"
|
|
|
-default_app = App(loader=default_loader)
|
|
|
+default_app = App(loader=default_loader, set_as_current=False)
|
|
|
|
|
|
if os.environ.get("CELERY_TRACE_APP"):
|
|
|
+
|
|
|
def app_or_default(app=None):
|
|
|
+ from traceback import print_stack
|
|
|
from multiprocessing import current_process
|
|
|
+ global _current_app
|
|
|
if app is None:
|
|
|
+ if _current_app:
|
|
|
+ print("-- RETURNING TO CURRENT APP --")
|
|
|
+ print_stack()
|
|
|
+ return _current_app
|
|
|
if current_process()._name == "MainProcess":
|
|
|
raise Exception("DEFAULT APP")
|
|
|
- print("RETURNING TO DEFAULT APP")
|
|
|
- import traceback
|
|
|
- traceback.print_stack()
|
|
|
+ print("-- RETURNING TO DEFAULT APP --")
|
|
|
+ print_stack()
|
|
|
return default_app
|
|
|
return app
|
|
|
else:
|
|
|
def app_or_default(app=None):
|
|
|
+ """Returns the app provided or the default app if none.
|
|
|
+
|
|
|
+ If the environment variable :envvar:`CELERY_TRACE_APP` is set,
|
|
|
+ any time there is no active app and exception is raised. This
|
|
|
+ is used to trace app leaks (when someone forgets to pass
|
|
|
+ along the app instance).
|
|
|
+
|
|
|
+ """
|
|
|
+ global _current_app
|
|
|
if app is None:
|
|
|
- return default_app
|
|
|
+ return _current_app or default_app
|
|
|
return app
|