|
@@ -320,19 +320,6 @@ class TaskSetResult(object):
|
|
|
elif result.status in states.PROPAGATE_STATES:
|
|
|
raise result.result
|
|
|
|
|
|
- def join_native(self, timeout=None, propagate=True):
|
|
|
- backend = self.subtasks[0].backend
|
|
|
- results = PositionQueue(length=self.total)
|
|
|
- ids = [subtask.task_id for subtask in self.subtasks]
|
|
|
-
|
|
|
- states = backend.get_many(ids, timeout=timeout)
|
|
|
-
|
|
|
- for task_id, meta in states.items():
|
|
|
- index = self.subtasks.index(task_id)
|
|
|
- results[index] = meta["result"]
|
|
|
-
|
|
|
- return list(results)
|
|
|
-
|
|
|
def join(self, timeout=None, propagate=True):
|
|
|
"""Gather the results of all tasks in the taskset,
|
|
|
and returns a list ordered by the order of the set.
|
|
@@ -368,6 +355,30 @@ class TaskSetResult(object):
|
|
|
time.time() >= time_start + timeout):
|
|
|
raise TimeoutError("join operation timed out.")
|
|
|
|
|
|
+ def join_native(self, timeout=None, propagate=True):
|
|
|
+ """Backend optimized version of :meth:`join`.
|
|
|
+
|
|
|
+ .. versionadded:: 2.2
|
|
|
+
|
|
|
+ Note that this does not support collecting the results
|
|
|
+ for different task types using different backends.
|
|
|
+
|
|
|
+ This is currently only supported by the AMQP result backend.
|
|
|
+
|
|
|
+ """
|
|
|
+ backend = self.subtasks[0].backend
|
|
|
+ results = PositionQueue(length=self.total)
|
|
|
+ ids = [subtask.task_id for subtask in self.subtasks]
|
|
|
+
|
|
|
+ states = backend.get_many(ids, timeout=timeout)
|
|
|
+
|
|
|
+ for task_id, meta in states.items():
|
|
|
+ index = self.subtasks.index(task_id)
|
|
|
+ results[index] = meta["result"]
|
|
|
+
|
|
|
+ return list(results)
|
|
|
+
|
|
|
+
|
|
|
def save(self, backend=None):
|
|
|
"""Save taskset result for later retrieval using :meth:`restore`.
|
|
|
|