|
@@ -5,12 +5,16 @@ import time
|
|
|
from copy import copy
|
|
|
from itertools import imap
|
|
|
|
|
|
+from celery import current_app
|
|
|
from celery import states
|
|
|
-from celery.backends import default_backend
|
|
|
-from celery.datastructures import PositionQueue
|
|
|
+from celery.app import app_or_default
|
|
|
from celery.exceptions import TimeoutError
|
|
|
-from celery.messaging import with_connection
|
|
|
-from celery.utils import any, all
|
|
|
+from celery.registry import _unpickle_task
|
|
|
+from celery.utils.compat import any, all
|
|
|
+
|
|
|
+
|
|
|
+def _unpickle_result(task_id, task_name):
|
|
|
+ return _unpickle_task(task_name).AsyncResult(task_id)
|
|
|
|
|
|
|
|
|
class BaseAsyncResult(object):
|
|
@@ -19,21 +23,22 @@ class BaseAsyncResult(object):
|
|
|
:param task_id: see :attr:`task_id`.
|
|
|
:param backend: see :attr:`backend`.
|
|
|
|
|
|
- .. attribute:: task_id
|
|
|
-
|
|
|
- The unique identifier for this task.
|
|
|
-
|
|
|
- .. attribute:: backend
|
|
|
-
|
|
|
- The task result backend used.
|
|
|
-
|
|
|
"""
|
|
|
|
|
|
+ #: Error raised for timeouts.
|
|
|
TimeoutError = TimeoutError
|
|
|
|
|
|
- def __init__(self, task_id, backend):
|
|
|
+ #: The task uuid.
|
|
|
+ task_id = None
|
|
|
+
|
|
|
+ #: The task result backend to use.
|
|
|
+ backend = None
|
|
|
+
|
|
|
+ def __init__(self, task_id, backend, task_name=None, app=None):
|
|
|
self.task_id = task_id
|
|
|
self.backend = backend
|
|
|
+ self.task_name = task_name
|
|
|
+ self.app = app_or_default(app)
|
|
|
|
|
|
def forget(self):
|
|
|
"""Forget about (and possibly remove the result of) this task."""
|
|
@@ -42,36 +47,47 @@ class BaseAsyncResult(object):
|
|
|
def revoke(self, connection=None, connect_timeout=None):
|
|
|
"""Send revoke signal to all workers.
|
|
|
|
|
|
- The workers will ignore the task if received.
|
|
|
+ Any worker receiving the task, or having reserved the
|
|
|
+ task, *must* ignore it.
|
|
|
|
|
|
"""
|
|
|
- from celery.task import control
|
|
|
- control.revoke(self.task_id, connection=connection,
|
|
|
- connect_timeout=connect_timeout)
|
|
|
+ self.app.control.revoke(self.task_id, connection=connection,
|
|
|
+ connect_timeout=connect_timeout)
|
|
|
|
|
|
- def wait(self, timeout=None):
|
|
|
- """Wait for task, and return the result when it arrives.
|
|
|
+ def get(self, timeout=None, propagate=True, interval=0.5):
|
|
|
+ """Wait until task is ready, and return its result.
|
|
|
|
|
|
- :keyword timeout: How long to wait, in seconds, before the
|
|
|
- operation times out.
|
|
|
+ .. warning::
|
|
|
+
|
|
|
+ Waiting for tasks within a task may lead to deadlocks.
|
|
|
+ Please read :ref:`task-synchronous-subtasks`.
|
|
|
|
|
|
- :raises celery.exceptions.TimeoutError: if ``timeout`` is not
|
|
|
- :const:`None` and the result does not arrive within ``timeout``
|
|
|
+ :keyword timeout: How long to wait, in seconds, before the
|
|
|
+ operation times out.
|
|
|
+ :keyword propagate: Re-raise exception if the task failed.
|
|
|
+ :keyword interval: Time to wait (in seconds) before retrying to
|
|
|
+ retrieve the result. Note that this does not have any effect
|
|
|
+ when using the AMQP result store backend, as it does not
|
|
|
+ use polling.
|
|
|
+
|
|
|
+ :raises celery.exceptions.TimeoutError: if `timeout` is not
|
|
|
+ :const:`None` and the result does not arrive within `timeout`
|
|
|
seconds.
|
|
|
|
|
|
- If the remote call raised an exception then that
|
|
|
- exception will be re-raised.
|
|
|
+ If the remote call raised an exception then that exception will
|
|
|
+ be re-raised.
|
|
|
|
|
|
"""
|
|
|
- return self.backend.wait_for(self.task_id, timeout=timeout)
|
|
|
+ return self.backend.wait_for(self.task_id, timeout=timeout,
|
|
|
+ propagate=propagate,
|
|
|
+ interval=interval)
|
|
|
|
|
|
- def get(self, timeout=None):
|
|
|
- """Alias to :meth:`wait`."""
|
|
|
- return self.wait(timeout=timeout)
|
|
|
+ def wait(self, *args, **kwargs):
|
|
|
+ """Deprecated alias to :meth:`get`."""
|
|
|
+ return self.get(*args, **kwargs)
|
|
|
|
|
|
def ready(self):
|
|
|
- """Returns :const:`True` if the task executed successfully, or raised
|
|
|
- an exception.
|
|
|
+ """Returns :const:`True` if the task has been executed.
|
|
|
|
|
|
If the task is still running, pending, or is waiting
|
|
|
for retry then :const:`False` is returned.
|
|
@@ -84,15 +100,15 @@ class BaseAsyncResult(object):
|
|
|
return self.status == states.SUCCESS
|
|
|
|
|
|
def failed(self):
|
|
|
- """Returns :const:`True` if the task failed by exception."""
|
|
|
+ """Returns :const:`True` if the task failed."""
|
|
|
return self.status == states.FAILURE
|
|
|
|
|
|
def __str__(self):
|
|
|
- """``str(self) -> self.task_id``"""
|
|
|
+ """`str(self) -> self.task_id`"""
|
|
|
return self.task_id
|
|
|
|
|
|
def __hash__(self):
|
|
|
- """``hash(self) -> hash(self.task_id)``"""
|
|
|
+ """`hash(self) -> hash(self.task_id)`"""
|
|
|
return hash(self.task_id)
|
|
|
|
|
|
def __repr__(self):
|
|
@@ -106,22 +122,23 @@ class BaseAsyncResult(object):
|
|
|
def __copy__(self):
|
|
|
return self.__class__(self.task_id, backend=self.backend)
|
|
|
|
|
|
+ def __reduce__(self):
|
|
|
+ if self.task_name:
|
|
|
+ return (_unpickle_result, (self.task_id, self.task_name))
|
|
|
+ else:
|
|
|
+ return (self.__class__, (self.task_id, self.backend,
|
|
|
+ None, self.app))
|
|
|
+
|
|
|
@property
|
|
|
def result(self):
|
|
|
"""When the task has been executed, this contains the return value.
|
|
|
-
|
|
|
- If the task raised an exception, this will be the exception instance.
|
|
|
-
|
|
|
- """
|
|
|
+ If the task raised an exception, this will be the exception
|
|
|
+ instance."""
|
|
|
return self.backend.get_result(self.task_id)
|
|
|
|
|
|
@property
|
|
|
def info(self):
|
|
|
- """Get state metadata.
|
|
|
-
|
|
|
- Alias to :meth:`result`.
|
|
|
-
|
|
|
- """
|
|
|
+ """Get state metadata. Alias to :meth:`result`."""
|
|
|
return self.result
|
|
|
|
|
|
@property
|
|
@@ -129,16 +146,11 @@ class BaseAsyncResult(object):
|
|
|
"""Get the traceback of a failed task."""
|
|
|
return self.backend.get_traceback(self.task_id)
|
|
|
|
|
|
- @property
|
|
|
- def status(self):
|
|
|
- """Deprecated alias of :attr:`state`."""
|
|
|
- return self.state
|
|
|
-
|
|
|
@property
|
|
|
def state(self):
|
|
|
- """The current status of the task.
|
|
|
+ """The tasks current state.
|
|
|
|
|
|
- Can be one of the following:
|
|
|
+ Possible values includes:
|
|
|
|
|
|
*PENDING*
|
|
|
|
|
@@ -154,111 +166,132 @@ class BaseAsyncResult(object):
|
|
|
|
|
|
*FAILURE*
|
|
|
|
|
|
- The task raised an exception, or has been retried more times
|
|
|
- than its limit. The :attr:`result` attribute contains the
|
|
|
- exception raised.
|
|
|
+ The task raised an exception, or has exceeded the retry limit.
|
|
|
+ The :attr:`result` attribute then contains the
|
|
|
+ exception raised by the task.
|
|
|
|
|
|
*SUCCESS*
|
|
|
|
|
|
The task executed successfully. The :attr:`result` attribute
|
|
|
- contains the resulting value.
|
|
|
+ then contains the tasks return value.
|
|
|
|
|
|
"""
|
|
|
return self.backend.get_status(self.task_id)
|
|
|
|
|
|
+ @property
|
|
|
+ def status(self):
|
|
|
+ """Deprecated alias of :attr:`state`."""
|
|
|
+ return self.state
|
|
|
+
|
|
|
|
|
|
class AsyncResult(BaseAsyncResult):
|
|
|
"""Pending task result using the default backend.
|
|
|
|
|
|
- :param task_id: see :attr:`task_id`.
|
|
|
+ :param task_id: The task uuid.
|
|
|
|
|
|
+ """
|
|
|
|
|
|
- .. attribute:: task_id
|
|
|
+ #: Task result store backend to use.
|
|
|
+ backend = None
|
|
|
|
|
|
- The unique identifier for this task.
|
|
|
+ def __init__(self, task_id, backend=None, task_name=None, app=None):
|
|
|
+ app = app_or_default(app)
|
|
|
+ backend = backend or app.backend
|
|
|
+ super(AsyncResult, self).__init__(task_id, backend,
|
|
|
+ task_name=task_name, app=app)
|
|
|
|
|
|
- .. attribute:: backend
|
|
|
|
|
|
- Instance of :class:`celery.backends.DefaultBackend`.
|
|
|
+class ResultSet(object):
|
|
|
+ """Working with more than one result.
|
|
|
|
|
|
- """
|
|
|
+ :param results: List of result instances.
|
|
|
|
|
|
- def __init__(self, task_id, backend=None):
|
|
|
- super(AsyncResult, self).__init__(task_id, backend or default_backend)
|
|
|
+ """
|
|
|
|
|
|
+ #: List of results in in the set.
|
|
|
+ results = None
|
|
|
|
|
|
-class TaskSetResult(object):
|
|
|
- """Working with :class:`~celery.task.TaskSet` results.
|
|
|
+ def __init__(self, results, app=None, **kwargs):
|
|
|
+ self.app = app_or_default(app)
|
|
|
+ self.results = results
|
|
|
|
|
|
- An instance of this class is returned by
|
|
|
- ``TaskSet``'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
|
|
|
- inspection of the subtasks status and return values as a single entity.
|
|
|
+ def add(self, result):
|
|
|
+ """Add :class:`AsyncResult` as a new member of the set.
|
|
|
|
|
|
- :option taskset_id: see :attr:`taskset_id`.
|
|
|
- :option subtasks: see :attr:`subtasks`.
|
|
|
+ Does nothing if the result is already a member.
|
|
|
|
|
|
- .. attribute:: taskset_id
|
|
|
+ """
|
|
|
+ if result not in self.results:
|
|
|
+ self.results.append(result)
|
|
|
|
|
|
- The UUID of the taskset itself.
|
|
|
+ def remove(self, result):
|
|
|
+ """Removes result from the set; it must be a member.
|
|
|
|
|
|
- .. attribute:: subtasks
|
|
|
+ :raises KeyError: if the result is not a member.
|
|
|
|
|
|
- A list of :class:`AsyncResult` instances for all of the subtasks.
|
|
|
+ """
|
|
|
+ if isinstance(result, basestring):
|
|
|
+ result = AsyncResult(result)
|
|
|
+ try:
|
|
|
+ self.results.remove(result)
|
|
|
+ except ValueError:
|
|
|
+ raise KeyError(result)
|
|
|
|
|
|
- """
|
|
|
+ def discard(self, result):
|
|
|
+ """Remove result from the set if it is a member.
|
|
|
|
|
|
- def __init__(self, taskset_id, subtasks):
|
|
|
- self.taskset_id = taskset_id
|
|
|
- self.subtasks = subtasks
|
|
|
+ If it is not a member, do nothing.
|
|
|
|
|
|
- def itersubtasks(self):
|
|
|
- """Taskset subtask iterator.
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ self.remove(result)
|
|
|
+ except KeyError:
|
|
|
+ pass
|
|
|
|
|
|
- :returns: an iterator for iterating over the tasksets
|
|
|
- :class:`AsyncResult` objects.
|
|
|
+ def update(self, results):
|
|
|
+ """Update set with the union of itself and an iterable with
|
|
|
+ results."""
|
|
|
+ self.results.extend(r for r in results if r not in self.results)
|
|
|
|
|
|
- """
|
|
|
- return (subtask for subtask in self.subtasks)
|
|
|
+ def clear(self):
|
|
|
+ """Remove all results from this set."""
|
|
|
+ self.results[:] = [] # don't create new list.
|
|
|
|
|
|
def successful(self):
|
|
|
- """Was the taskset successful?
|
|
|
+ """Was all of the tasks successful?
|
|
|
|
|
|
- :returns: :const:`True` if all of the tasks in the taskset finished
|
|
|
+ :returns: :const:`True` if all of the tasks finished
|
|
|
successfully (i.e. did not raise an exception).
|
|
|
|
|
|
"""
|
|
|
- return all(subtask.successful()
|
|
|
- for subtask in self.itersubtasks())
|
|
|
+ return all(result.successful() for result in self.results)
|
|
|
|
|
|
def failed(self):
|
|
|
- """Did the taskset fail?
|
|
|
+ """Did any of the tasks fail?
|
|
|
|
|
|
- :returns: :const:`True` if any of the tasks in the taskset failed.
|
|
|
+ :returns: :const:`True` if any of the tasks failed.
|
|
|
(i.e., raised an exception)
|
|
|
|
|
|
"""
|
|
|
- return any(subtask.failed()
|
|
|
- for subtask in self.itersubtasks())
|
|
|
+ return any(result.failed() for result in self.results)
|
|
|
|
|
|
def waiting(self):
|
|
|
- """Is the taskset waiting?
|
|
|
+ """Are any of the tasks incomplete?
|
|
|
|
|
|
- :returns: :const:`True` if any of the tasks in the taskset is still
|
|
|
+ :returns: :const:`True` if any of the tasks is still
|
|
|
waiting for execution.
|
|
|
|
|
|
"""
|
|
|
- return any(not subtask.ready()
|
|
|
- for subtask in self.itersubtasks())
|
|
|
+ return any(not result.ready() for result in self.results)
|
|
|
|
|
|
def ready(self):
|
|
|
- """Is the task ready?
|
|
|
+ """Did all of the tasks complete? (either by success of failure).
|
|
|
|
|
|
- :returns: :const:`True` if all of the tasks in the taskset has been
|
|
|
+ :returns: :const:`True` if all of the tasks been
|
|
|
executed.
|
|
|
|
|
|
"""
|
|
|
- return all(subtask.ready()
|
|
|
- for subtask in self.itersubtasks())
|
|
|
+ return all(result.ready() for result in self.results)
|
|
|
|
|
|
def completed_count(self):
|
|
|
"""Task completion count.
|
|
@@ -266,26 +299,29 @@ class TaskSetResult(object):
|
|
|
:returns: the number of tasks completed.
|
|
|
|
|
|
"""
|
|
|
- return sum(imap(int, (subtask.successful()
|
|
|
- for subtask in self.itersubtasks())))
|
|
|
+ return sum(imap(int, (result.successful() for result in self.results)))
|
|
|
|
|
|
def forget(self):
|
|
|
- """Forget about (and possible remove the result of) all the tasks
|
|
|
- in this taskset."""
|
|
|
- for subtask in self.subtasks:
|
|
|
- subtask.forget()
|
|
|
+ """Forget about (and possible remove the result of) all the tasks."""
|
|
|
+ for result in self.results:
|
|
|
+ result.forget()
|
|
|
|
|
|
- @with_connection
|
|
|
def revoke(self, connection=None, connect_timeout=None):
|
|
|
- for subtask in self.subtasks:
|
|
|
- subtask.revoke(connection=connection)
|
|
|
+ """Revoke all tasks in the set."""
|
|
|
+
|
|
|
+ def _do_revoke(connection=None, connect_timeout=None):
|
|
|
+ for result in self.results:
|
|
|
+ result.revoke(connection=connection)
|
|
|
+
|
|
|
+ return self.app.with_default_connection(_do_revoke)(
|
|
|
+ connection=connection, connect_timeout=connect_timeout)
|
|
|
|
|
|
def __iter__(self):
|
|
|
- """``iter(res)`` -> ``res.iterate()``."""
|
|
|
return self.iterate()
|
|
|
|
|
|
def __getitem__(self, index):
|
|
|
- return self.subtasks[index]
|
|
|
+ """`res[i] -> res.results[i]`"""
|
|
|
+ return self.results[index]
|
|
|
|
|
|
def iterate(self):
|
|
|
"""Iterate over the return values of the tasks as they finish
|
|
@@ -294,9 +330,9 @@ class TaskSetResult(object):
|
|
|
:raises: The exception if any of the tasks raised an exception.
|
|
|
|
|
|
"""
|
|
|
- pending = list(self.subtasks)
|
|
|
- results = dict((subtask.task_id, copy(subtask))
|
|
|
- for subtask in self.subtasks)
|
|
|
+ pending = list(self.results)
|
|
|
+ results = dict((result.task_id, copy(result))
|
|
|
+ for result in self.results)
|
|
|
while pending:
|
|
|
for task_id in pending:
|
|
|
result = results[task_id]
|
|
@@ -309,107 +345,195 @@ class TaskSetResult(object):
|
|
|
elif result.status in states.PROPAGATE_STATES:
|
|
|
raise result.result
|
|
|
|
|
|
- def join(self, timeout=None, propagate=True):
|
|
|
- """Gather the results of all tasks in the taskset,
|
|
|
- and returns a list ordered by the order of the set.
|
|
|
+ def join(self, timeout=None, propagate=True, interval=0.5):
|
|
|
+ """Gathers the results of all tasks as a list in order.
|
|
|
+
|
|
|
+ .. note::
|
|
|
+
|
|
|
+ This can be an expensive operation for result store
|
|
|
+ backends that must resort to polling (e.g. database).
|
|
|
|
|
|
- :keyword timeout: The number of seconds to wait for results
|
|
|
- before the operation times out.
|
|
|
+ You should consider using :meth:`join_native` if your backend
|
|
|
+ supports it.
|
|
|
|
|
|
- :keyword propagate: If any of the subtasks raises an exception, the
|
|
|
- exception will be reraised.
|
|
|
+ .. warning::
|
|
|
|
|
|
- :raises celery.exceptions.TimeoutError: if ``timeout`` is not
|
|
|
- :const:`None` and the operation takes longer than ``timeout``
|
|
|
+ Waiting for tasks within a task may lead to deadlocks.
|
|
|
+ Please see :ref:`task-synchronous-subtasks`.
|
|
|
+
|
|
|
+ :keyword timeout: The number of seconds to wait for results before
|
|
|
+ the operation times out.
|
|
|
+
|
|
|
+ :keyword propagate: If any of the tasks raises an exception, the
|
|
|
+ exception will be re-raised.
|
|
|
+
|
|
|
+ :keyword interval: Time to wait (in seconds) before retrying to
|
|
|
+ retrieve a result from the set. Note that this
|
|
|
+ does not have any effect when using the AMQP
|
|
|
+ result store backend, as it does not use polling.
|
|
|
+
|
|
|
+ :raises celery.exceptions.TimeoutError: if `timeout` is not
|
|
|
+ :const:`None` and the operation takes longer than `timeout`
|
|
|
seconds.
|
|
|
|
|
|
- :returns: list of return values for all subtasks in order.
|
|
|
+ """
|
|
|
+ time_start = time.time()
|
|
|
+ remaining = None
|
|
|
+
|
|
|
+ results = []
|
|
|
+ for result in self.results:
|
|
|
+ remaining = None
|
|
|
+ if timeout:
|
|
|
+ remaining = timeout - (time.time() - time_start)
|
|
|
+ if remaining <= 0.0:
|
|
|
+ raise TimeoutError("join operation timed out")
|
|
|
+ results.append(result.wait(timeout=remaining,
|
|
|
+ propagate=propagate,
|
|
|
+ interval=interval))
|
|
|
+ return results
|
|
|
+
|
|
|
+ def iter_native(self, timeout=None):
|
|
|
+ backend = self.results[0].backend
|
|
|
+ ids = [result.task_id for result in self.results]
|
|
|
+ return backend.get_many(ids, timeout=timeout)
|
|
|
+
|
|
|
+ def join_native(self, timeout=None, propagate=True):
|
|
|
+ """Backend optimized version of :meth:`join`.
|
|
|
+
|
|
|
+ .. versionadded:: 2.2
|
|
|
+
|
|
|
+ Note that this does not support collecting the results
|
|
|
+ for different task types using different backends.
|
|
|
+
|
|
|
+ This is currently only supported by the AMQP result backend.
|
|
|
|
|
|
"""
|
|
|
+ backend = self.results[0].backend
|
|
|
+ results = [None for _ in xrange(len(self.results))]
|
|
|
|
|
|
- time_start = time.time()
|
|
|
+ ids = [result.task_id for result in self.results]
|
|
|
+ states = dict(backend.get_many(ids, timeout=timeout))
|
|
|
+
|
|
|
+ for task_id, meta in states.items():
|
|
|
+ index = self.results.index(task_id)
|
|
|
+ results[index] = meta["result"]
|
|
|
+
|
|
|
+ return list(results)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def total(self):
|
|
|
+ """Total number of tasks in the set."""
|
|
|
+ return len(self.results)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def subtasks(self):
|
|
|
+ """Deprecated alias to :attr:`results`."""
|
|
|
+ return self.results
|
|
|
+
|
|
|
+
|
|
|
+class TaskSetResult(ResultSet):
|
|
|
+ """An instance of this class is returned by
|
|
|
+ `TaskSet`'s :meth:`~celery.task.TaskSet.apply_async` method.
|
|
|
+
|
|
|
+ It enables inspection of the tasks state and return values as
|
|
|
+ a single entity.
|
|
|
+
|
|
|
+ :param taskset_id: The id of the taskset.
|
|
|
+ :param results: List of result instances.
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ #: The UUID of the taskset.
|
|
|
+ taskset_id = None
|
|
|
+
|
|
|
+ #: List/iterator of results in the taskset
|
|
|
+ results = None
|
|
|
+
|
|
|
+ def __init__(self, taskset_id, results=None, **kwargs):
|
|
|
+ self.taskset_id = taskset_id
|
|
|
|
|
|
- def on_timeout():
|
|
|
- raise TimeoutError("The operation timed out.")
|
|
|
-
|
|
|
- results = PositionQueue(length=self.total)
|
|
|
-
|
|
|
- while True:
|
|
|
- for position, pending_result in enumerate(self.subtasks):
|
|
|
- state = pending_result.state
|
|
|
- if state in states.READY_STATES:
|
|
|
- if propagate and state in states.PROPAGATE_STATES:
|
|
|
- raise pending_result.result
|
|
|
- results[position] = pending_result.result
|
|
|
- if results.full():
|
|
|
- # Make list copy, so the returned type is not a position
|
|
|
- # queue.
|
|
|
- return list(results)
|
|
|
- else:
|
|
|
- if (timeout is not None and
|
|
|
- time.time() >= time_start + timeout):
|
|
|
- on_timeout()
|
|
|
-
|
|
|
- def save(self, backend=default_backend):
|
|
|
+ # XXX previously the "results" arg was named "subtasks".
|
|
|
+ if "subtasks" in kwargs:
|
|
|
+ results = kwargs["subtasks"]
|
|
|
+ super(TaskSetResult, self).__init__(results, **kwargs)
|
|
|
+
|
|
|
+ def save(self, backend=None):
|
|
|
"""Save taskset result for later retrieval using :meth:`restore`.
|
|
|
|
|
|
- Example:
|
|
|
+ Example::
|
|
|
|
|
|
>>> result.save()
|
|
|
- >>> result = TaskSetResult.restore(task_id)
|
|
|
+ >>> result = TaskSetResult.restore(taskset_id)
|
|
|
|
|
|
"""
|
|
|
- backend.save_taskset(self.taskset_id, self)
|
|
|
+ return (backend or self.app.backend).save_taskset(self.taskset_id,
|
|
|
+ self)
|
|
|
+
|
|
|
+ def delete(self, backend=None):
|
|
|
+ """Remove this result if it was previously saved."""
|
|
|
+ (backend or self.app.backend).delete_taskset(self.taskset_id)
|
|
|
|
|
|
@classmethod
|
|
|
- def restore(self, taskset_id, backend=default_backend):
|
|
|
+ def restore(self, taskset_id, backend=None):
|
|
|
"""Restore previously saved taskset result."""
|
|
|
+ if backend is None:
|
|
|
+ backend = current_app.backend
|
|
|
return backend.restore_taskset(taskset_id)
|
|
|
|
|
|
- @property
|
|
|
- def total(self):
|
|
|
- """The total number of tasks in the :class:`~celery.task.TaskSet`."""
|
|
|
- return len(self.subtasks)
|
|
|
+ def itersubtasks(self):
|
|
|
+ """Depreacted. Use ``iter(self.results)`` instead."""
|
|
|
+ return iter(self.results)
|
|
|
+
|
|
|
+ def __reduce__(self):
|
|
|
+ return (self.__class__, (self.taskset_id, self.results))
|
|
|
|
|
|
|
|
|
class EagerResult(BaseAsyncResult):
|
|
|
- """Result that we know has already been executed. """
|
|
|
+ """Result that we know has already been executed."""
|
|
|
TimeoutError = TimeoutError
|
|
|
|
|
|
- def __init__(self, task_id, ret_value, status, traceback=None):
|
|
|
+ def __init__(self, task_id, ret_value, state, traceback=None):
|
|
|
self.task_id = task_id
|
|
|
self._result = ret_value
|
|
|
- self._status = status
|
|
|
+ self._state = state
|
|
|
self._traceback = traceback
|
|
|
|
|
|
+ def __reduce__(self):
|
|
|
+ return (self.__class__, (self.task_id, self._result,
|
|
|
+ self._state, self._traceback))
|
|
|
+
|
|
|
+ def __copy__(self):
|
|
|
+ cls, args = self.__reduce__()
|
|
|
+ return cls(*args)
|
|
|
+
|
|
|
def successful(self):
|
|
|
"""Returns :const:`True` if the task executed without failure."""
|
|
|
- return self.status == states.SUCCESS
|
|
|
+ return self.state == states.SUCCESS
|
|
|
|
|
|
def ready(self):
|
|
|
"""Returns :const:`True` if the task has been executed."""
|
|
|
return True
|
|
|
|
|
|
- def wait(self, timeout=None):
|
|
|
+ def get(self, timeout=None, propagate=True, **kwargs):
|
|
|
"""Wait until the task has been executed and return its result."""
|
|
|
- if self.status == states.SUCCESS:
|
|
|
+ if self.state == states.SUCCESS:
|
|
|
+ return self.result
|
|
|
+ elif self.state in states.PROPAGATE_STATES:
|
|
|
+ if propagate:
|
|
|
+ raise self.result
|
|
|
return self.result
|
|
|
- elif self.status in states.PROPAGATE_STATES:
|
|
|
- raise self.result
|
|
|
|
|
|
def revoke(self):
|
|
|
- self._status = states.REVOKED
|
|
|
+ self._state = states.REVOKED
|
|
|
+
|
|
|
+ def __repr__(self):
|
|
|
+ return "<EagerResult: %s>" % self.task_id
|
|
|
|
|
|
@property
|
|
|
def result(self):
|
|
|
"""The tasks return value"""
|
|
|
return self._result
|
|
|
|
|
|
- @property
|
|
|
- def status(self):
|
|
|
- """The tasks status (alias to :attr:`state`)."""
|
|
|
- return self._status
|
|
|
-
|
|
|
@property
|
|
|
def state(self):
|
|
|
"""The tasks state."""
|
|
@@ -420,5 +544,7 @@ class EagerResult(BaseAsyncResult):
|
|
|
"""The traceback if the task failed."""
|
|
|
return self._traceback
|
|
|
|
|
|
- def __repr__(self):
|
|
|
- return "<EagerResult: %s>" % self.task_id
|
|
|
+ @property
|
|
|
+ def status(self):
|
|
|
+ """The tasks status (alias to :attr:`state`)."""
|
|
|
+ return self._state
|