|
@@ -28,7 +28,7 @@ from ..loaders import get_loader_cls
|
|
|
from ..local import PromiseProxy, maybe_evaluate
|
|
|
from ..utils import cached_property, register_after_fork
|
|
|
from ..utils.functional import first
|
|
|
-from ..utils.imports import instantiate
|
|
|
+from ..utils.imports import instantiate, symbol_by_name
|
|
|
|
|
|
from . import annotations
|
|
|
from .builtins import load_builtin_tasks
|
|
@@ -107,31 +107,44 @@ class App(object):
|
|
|
def create_task_cls(self):
|
|
|
"""Creates a base task class using default configuration
|
|
|
taken from this app."""
|
|
|
- from .task import BaseTask
|
|
|
+ return self.subclass_with_self("celery.app.task:BaseTask", name="Task",
|
|
|
+ attribute="_app", abstract=True)
|
|
|
|
|
|
- class Task(BaseTask):
|
|
|
- _app = self
|
|
|
- abstract = True
|
|
|
+ def subclass_with_self(self, Class, name=None, attribute="app", **kw):
|
|
|
+ """Subclass an app-compatible class by setting its app attribute
|
|
|
+ to be this app instance.
|
|
|
|
|
|
- Task.__doc__ = BaseTask.__doc__
|
|
|
+ App-compatible means that the class has a class attribute that
|
|
|
+ provides the default app it should use, e.g.
|
|
|
+ ``class Foo: app = None``.
|
|
|
|
|
|
- return Task
|
|
|
+ :param Class: The app-compatible class to subclass.
|
|
|
+ :keyword name: Custom name for the target class.
|
|
|
+ :keyword attribute: Name of the attribute holding the app,
|
|
|
+ default is "app".
|
|
|
|
|
|
- def Worker(self, **kwargs):
|
|
|
+ """
|
|
|
+ Class = symbol_by_name(Class)
|
|
|
+ return type(name or Class.__name__, (Class, ), dict({attribute: self,
|
|
|
+ "__module__": Class.__module__, "__doc__": Class.__doc__}, **kw))
|
|
|
+
|
|
|
+ @cached_property
|
|
|
+ def Worker(self):
|
|
|
"""Create new :class:`~celery.apps.worker.Worker` instance."""
|
|
|
- return instantiate("celery.apps.worker:Worker", app=self, **kwargs)
|
|
|
+ return self.subclass_with_self("celery.apps.worker:Worker")
|
|
|
|
|
|
+ @cached_property
|
|
|
def WorkController(self, **kwargs):
|
|
|
- return instantiate("celery.worker:WorkController", app=self, **kwargs)
|
|
|
+ return self.subclass_with_self("celery.worker:WorkController")
|
|
|
|
|
|
+ @cached_property
|
|
|
def Beat(self, **kwargs):
|
|
|
"""Create new :class:`~celery.apps.beat.Beat` instance."""
|
|
|
- return instantiate("celery.apps.beat:Beat", app=self, **kwargs)
|
|
|
+ return self.subclass_with_self("celery.apps.beat:Beat")
|
|
|
|
|
|
- def TaskSet(self, *args, **kwargs):
|
|
|
- """Create new :class:`~celery.task.sets.TaskSet`."""
|
|
|
- return instantiate("celery.task.sets:TaskSet",
|
|
|
- app=self, *args, **kwargs)
|
|
|
+ @cached_property
|
|
|
+ def TaskSet(self):
|
|
|
+ return self.subclass_with_self("celery.task.sets:group")
|
|
|
|
|
|
def start(self, argv=None):
|
|
|
"""Run :program:`celery` using `argv`. Uses :data:`sys.argv`
|
|
@@ -324,16 +337,13 @@ class App(object):
|
|
|
publisher or publish.close()
|
|
|
return result_cls(new_id)
|
|
|
|
|
|
- def AsyncResult(self, task_id, backend=None, task_name=None):
|
|
|
- """Create :class:`celery.result.BaseAsyncResult` instance."""
|
|
|
- from ..result import AsyncResult
|
|
|
- return AsyncResult(task_id, app=self, task_name=task_name,
|
|
|
- backend=backend or self.backend)
|
|
|
+ @cached_property
|
|
|
+ def AsyncResult(self):
|
|
|
+ return self.subclass_with_self("celery.result:AsyncResult")
|
|
|
|
|
|
- def TaskSetResult(self, taskset_id, results, **kwargs):
|
|
|
- """Create :class:`celery.result.TaskSetResult` instance."""
|
|
|
- from ..result import TaskSetResult
|
|
|
- return TaskSetResult(taskset_id, results, app=self)
|
|
|
+ @cached_property
|
|
|
+ def TaskSetResult(self):
|
|
|
+ return self.subclass_with_self("celery.result:TaskSetResult")
|
|
|
|
|
|
def broker_connection(self, hostname=None, userid=None,
|
|
|
password=None, virtual_host=None, port=None, ssl=None,
|