|
@@ -28,10 +28,13 @@ def from_serializable(r):
|
|
|
# earlier backends may just pickle, so check if
|
|
|
# result is already prepared.
|
|
|
if not isinstance(r, ResultBase):
|
|
|
- id, nodes = r
|
|
|
+ id = parent = None
|
|
|
+ res, nodes = r
|
|
|
if nodes:
|
|
|
- return GroupResult(id, [AsyncResult(id) for id, _ in nodes])
|
|
|
- return AsyncResult(id)
|
|
|
+ return GroupResult(res, [AsyncResult(id) for id, _ in nodes])
|
|
|
+ if isinstance(res, (list, tuple)):
|
|
|
+ id, parent = res[0], res[1]
|
|
|
+ return AsyncResult(id, parent=parent)
|
|
|
return r
|
|
|
|
|
|
|
|
@@ -69,7 +72,7 @@ class AsyncResult(ResultBase):
|
|
|
self.parent = parent
|
|
|
|
|
|
def serializable(self):
|
|
|
- return self.id, None
|
|
|
+ return [self.id, self.parent and self.parent.id], None
|
|
|
|
|
|
def forget(self):
|
|
|
"""Forget about (and possibly remove the result of) this task."""
|
|
@@ -114,11 +117,21 @@ class AsyncResult(ResultBase):
|
|
|
be re-raised.
|
|
|
|
|
|
"""
|
|
|
+ if propagate and self.parent:
|
|
|
+ for node in reversed(list(self._parents())):
|
|
|
+ node.get(propagate=True, timeout=timeout, interval=interval)
|
|
|
+
|
|
|
return self.backend.wait_for(self.id, timeout=timeout,
|
|
|
propagate=propagate,
|
|
|
interval=interval)
|
|
|
wait = get # deprecated alias to :meth:`get`.
|
|
|
|
|
|
+ def _parents(self):
|
|
|
+ node = self.parent
|
|
|
+ while node:
|
|
|
+ yield node
|
|
|
+ node = node.parent
|
|
|
+
|
|
|
def collect(self, intermediate=False, **kwargs):
|
|
|
"""Iterator, like :meth:`get` will wait for the task to complete,
|
|
|
but will also follow :class:`AsyncResult` and :class:`ResultSet`
|