|
@@ -25,8 +25,9 @@ from celery.utils.functional import mattrgetter, maybe_list
|
|
|
from celery.utils.imports import instantiate
|
|
|
from celery.utils.mail import ErrorMail
|
|
|
|
|
|
-from celery.app.state import get_current_task
|
|
|
-from celery.app.registry import _unpickle_task
|
|
|
+from .annotations import resolve_all as resolve_all_annotations
|
|
|
+from .state import get_current_task
|
|
|
+from .registry import _unpickle_task
|
|
|
|
|
|
#: extracts options related to publishing a message from a dict.
|
|
|
extract_exec_options = mattrgetter("queue", "routing_key",
|
|
@@ -313,7 +314,7 @@ class BaseTask(object):
|
|
|
self.backend = app.backend
|
|
|
|
|
|
# decorate with annotations from config.
|
|
|
- app.annotate_task(self)
|
|
|
+ self.annotate()
|
|
|
|
|
|
# PeriodicTask uses this to add itself to the PeriodicTask schedule.
|
|
|
self.on_bound(app)
|
|
@@ -333,14 +334,14 @@ class BaseTask(object):
|
|
|
return self._app
|
|
|
app = property(_get_app, bind)
|
|
|
|
|
|
+ def __call__(self, *args, **kwargs):
|
|
|
+ return self.run(*args, **kwargs)
|
|
|
+
|
|
|
# - tasks are pickled into the name of the task only, and the reciever
|
|
|
# - simply grabs it from the local registry.
|
|
|
def __reduce__(self):
|
|
|
return (_unpickle_task, (self.name, ), None)
|
|
|
|
|
|
- def __call__(self, *args, **kwargs):
|
|
|
- return self.run(*args, **kwargs)
|
|
|
-
|
|
|
def run(self, *args, **kwargs):
|
|
|
"""The body of the task executed by workers."""
|
|
|
raise NotImplementedError("Tasks must define the run method.")
|
|
@@ -700,6 +701,13 @@ class BaseTask(object):
|
|
|
return self._get_app().AsyncResult(task_id, backend=self.backend,
|
|
|
task_name=self.name)
|
|
|
|
|
|
+ def subtask(self, *args, **kwargs):
|
|
|
+ """Returns :class:`~celery.task.sets.subtask` object for
|
|
|
+ this task, wrapping arguments and execution options
|
|
|
+ for a single task invocation."""
|
|
|
+ from celery.task.sets import subtask
|
|
|
+ return subtask(self, *args, **kwargs)
|
|
|
+
|
|
|
def update_state(self, task_id=None, state=None, meta=None):
|
|
|
"""Update task state.
|
|
|
|
|
@@ -800,17 +808,14 @@ class BaseTask(object):
|
|
|
"""
|
|
|
request.execute_using_pool(pool, loglevel, logfile)
|
|
|
|
|
|
+ def annotate(self):
|
|
|
+ for d in resolve_all_annotations(self.app.annotations, self):
|
|
|
+ self.__dict__.update(d)
|
|
|
+
|
|
|
def __repr__(self):
|
|
|
"""`repr(task)`"""
|
|
|
return "<@task: %s>" % (self.name, )
|
|
|
|
|
|
- def subtask(self, *args, **kwargs):
|
|
|
- """Returns :class:`~celery.task.sets.subtask` object for
|
|
|
- this task, wrapping arguments and execution options
|
|
|
- for a single task invocation."""
|
|
|
- from celery.task.sets import subtask
|
|
|
- return subtask(self, *args, **kwargs)
|
|
|
-
|
|
|
@property
|
|
|
def __name__(self):
|
|
|
return self.__class__.__name__
|