|
@@ -25,6 +25,7 @@ from .app import app_or_default
|
|
|
from .datastructures import DependencyGraph, GraphFormatter
|
|
|
from .exceptions import IncompleteStream, TimeoutError
|
|
|
from .five import items, range, string_t, monotonic
|
|
|
+from .utils import deprecated
|
|
|
|
|
|
__all__ = ['ResultBase', 'AsyncResult', 'ResultSet', 'GroupResult',
|
|
|
'EagerResult', 'result_from_tuple']
|
|
@@ -160,7 +161,8 @@ class AsyncResult(ResultBase):
|
|
|
def collect(self, intermediate=False, **kwargs):
|
|
|
"""Iterator, like :meth:`get` will wait for the task to complete,
|
|
|
but will also follow :class:`AsyncResult` and :class:`ResultSet`
|
|
|
- returned by the task, yielding for each result in the tree.
|
|
|
+ returned by the task, yielding ``(result, value)`` tuples for each
|
|
|
+ result in the tree.
|
|
|
|
|
|
An example would be having the following tasks:
|
|
|
|
|
@@ -169,26 +171,32 @@ class AsyncResult(ResultBase):
|
|
|
from celery import group
|
|
|
from proj.celery import app
|
|
|
|
|
|
- @app.task
|
|
|
+ @app.task(trail=True)
|
|
|
def A(how_many):
|
|
|
return group(B.s(i) for i in range(how_many))()
|
|
|
|
|
|
- @app.task
|
|
|
+ @app.task(trail=True)
|
|
|
def B(i):
|
|
|
return pow2.delay(i)
|
|
|
|
|
|
- @app.task
|
|
|
+ @app.task(trail=True)
|
|
|
def pow2(i):
|
|
|
return i ** 2
|
|
|
|
|
|
+ Note that the ``trail`` option must be enabled
|
|
|
+ so that the list of children is stored in ``result.children``.
|
|
|
+ This is the default but enabled explicitly for illustration.
|
|
|
+
|
|
|
Calling :meth:`collect` would return:
|
|
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
+ >>> from celery.result import ResultBase
|
|
|
>>> from proj.tasks import A
|
|
|
|
|
|
>>> result = A.delay(10)
|
|
|
- >>> list(result.collect())
|
|
|
+ >>> [v for v in result.collect()
|
|
|
+ ... if not isinstance(v, (ResultBase, tuple))]
|
|
|
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
|
|
|
|
|
|
"""
|
|
@@ -474,13 +482,9 @@ class ResultSet(ResultBase):
|
|
|
"""`res[i] -> res.results[i]`"""
|
|
|
return self.results[index]
|
|
|
|
|
|
+ @deprecated('3.2', '3.3')
|
|
|
def iterate(self, timeout=None, propagate=True, interval=0.5):
|
|
|
- """Iterate over the return values of the tasks as they finish
|
|
|
- one by one.
|
|
|
-
|
|
|
- :raises: The exception if any of the tasks raised an exception.
|
|
|
-
|
|
|
- """
|
|
|
+ """Deprecated method, use :meth:`get` with a callback argument."""
|
|
|
elapsed = 0.0
|
|
|
results = OrderedDict((result.id, copy(result))
|
|
|
for result in self.results)
|
|
@@ -546,6 +550,13 @@ class ResultSet(ResultBase):
|
|
|
No results will be returned by this function if
|
|
|
a callback is specified. The order of results
|
|
|
is also arbitrary when a callback is used.
|
|
|
+ To get access to the result object for a particular
|
|
|
+ id you will have to generate an index first:
|
|
|
+ ``results = {r.id: r
|
|
|
+ for r in gresult.results.values()}``
|
|
|
+ Or you can create new result objects on the fly:
|
|
|
+ ``result = app.AsyncResult(task_id)`` (both will
|
|
|
+ take advantage of the backend cache anyway).
|
|
|
|
|
|
:raises celery.exceptions.TimeoutError: if `timeout` is not
|
|
|
:const:`None` and the operation takes longer than `timeout`
|