|
@@ -381,6 +381,11 @@ class Task(object):
|
|
|
|
|
|
@classmethod
|
|
|
def AsyncResult(self, task_id):
|
|
|
+ """Get AsyncResult instance for this kind of task.
|
|
|
+
|
|
|
+ :param task_id: Task id to get result for.
|
|
|
+
|
|
|
+ """
|
|
|
return BaseAsyncResult(task_id, backend=self.backend)
|
|
|
|
|
|
def on_retry(self, exc, task_id, args, kwargs):
|
|
@@ -440,6 +445,7 @@ class Task(object):
|
|
|
wrapper.execute_using_pool(pool, loglevel, logfile)
|
|
|
|
|
|
def __repr__(self):
|
|
|
+ """repr(task)"""
|
|
|
try:
|
|
|
kind = self.__class__.mro()[1].__name__
|
|
|
except (AttributeError, IndexError):
|
|
@@ -585,14 +591,18 @@ class TaskSet(object):
|
|
|
return TaskSetResult(taskset_id, subtasks)
|
|
|
|
|
|
def apply_part(self, arglist, taskset_id, publisher):
|
|
|
+ """Apply a single part of the taskset."""
|
|
|
args, kwargs, opts = padlist(arglist, 3, default={})
|
|
|
return apply_async(self.task, args, kwargs,
|
|
|
taskset_id=taskset_id, publisher=publisher, **opts)
|
|
|
|
|
|
def apply(self):
|
|
|
+ """Applies the taskset locally."""
|
|
|
taskset_id = gen_unique_id()
|
|
|
subtasks = [apply(self.task, args, kwargs)
|
|
|
for args, kwargs in self.arguments]
|
|
|
+
|
|
|
+ # This will be filled with EagerResults.
|
|
|
return TaskSetResult(taskset_id, subtasks)
|
|
|
|
|
|
@classmethod
|
|
@@ -600,7 +610,7 @@ class TaskSet(object):
|
|
|
"""Apply ``args`` to function by distributing the args to the
|
|
|
celery server(s)."""
|
|
|
pickled = pickle.dumps(func)
|
|
|
- arguments = [[[pickled, arg, {}], {}] for arg in args]
|
|
|
+ arguments = [((pickled, arg, {}), {}) for arg in args]
|
|
|
return cls(ExecuteRemoteTask, arguments)
|
|
|
|
|
|
@classmethod
|