123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- """
- Asynchronous result types.
- """
- from celery.backends import default_backend
- from celery.datastructures import PositionQueue
- from celery.exceptions import TimeoutError
- from itertools import imap
- import time
- class BaseAsyncResult(object):
- """Base class for pending result, supports custom
- task meta :attr:`backend`
- :param task_id: see :attr:`task_id`.
- :param backend: see :attr:`backend`.
- .. attribute:: task_id
- The unique identifier for this task.
- .. attribute:: backend
- The task result backend used.
- """
- TimeoutError = TimeoutError
- def __init__(self, task_id, backend):
- self.task_id = task_id
- self.backend = backend
- def get(self):
- """Alias to :meth:`wait`."""
- return self.wait()
- def wait(self, timeout=None):
- """Wait for task, and return the result when it arrives.
- :keyword timeout: How long to wait in seconds, before the
- operation times out.
- :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
- and the result does not arrive within ``timeout`` seconds.
- If the remote call raised an exception then that
- exception will be re-raised.
- """
- return self.backend.wait_for(self.task_id, timeout=timeout)
- def ready(self):
- """Returns ``True`` if the task executed successfully, or raised
- an exception. If the task is still pending, or is waiting for retry
- then ``False`` is returned.
- :rtype: bool
- """
- status = self.backend.get_status(self.task_id)
- return status not in ["PENDING", "RETRY"]
- def successful(self):
- """Returns ``True`` if the task executed successfully.
- :rtype: bool
- """
- return self.backend.is_successful(self.task_id)
- def __str__(self):
- """``str(self)`` -> ``self.task_id``"""
- return self.task_id
- def __repr__(self):
- return "<AsyncResult: %s>" % self.task_id
- @property
- def result(self):
- """When the task has been executed, this contains the return value.
- If the task raised an exception, this will be the exception instance.
- """
- if self.status == "SUCCESS" or self.status == "FAILURE":
- return self.backend.get_result(self.task_id)
- return None
- @property
- def traceback(self):
- """Get the traceback of a failed task."""
- return self.backend.get_traceback(self.task_id)
- @property
- def status(self):
- """The current status of the task.
- Can be one of the following:
- *PENDING*
- The task is waiting for execution.
- *RETRY*
- The task is to be retried, possibly because of failure.
- *FAILURE*
- The task raised an exception, or has been retried more times
- than its limit. The :attr:`result` attribute contains the
- exception raised.
- *SUCCESS*
- The task executed successfully. The :attr:`result` attribute
- contains the resulting value.
- """
- return self.backend.get_status(self.task_id)
- class AsyncResult(BaseAsyncResult):
- """Pending task result using the default backend.
- :param task_id: see :attr:`task_id`.
- .. attribute:: task_id
- The unique identifier for this task.
- .. attribute:: backend
- Instance of :class:`celery.backends.DefaultBackend`.
- """
- def __init__(self, task_id):
- super(AsyncResult, self).__init__(task_id, backend=default_backend)
- class TaskSetResult(object):
- """Working with :class:`celery.task.TaskSet` results.
- An instance of this class is returned by
- :meth:`celery.task.TaskSet.run()`. It lets you inspect the status and
- return values of the taskset as a single entity.
- :option taskset_id: see :attr:`taskset_id`.
- :option subtasks: see :attr:`subtasks`.
- .. attribute:: taskset_id
- The UUID of the taskset itself.
- .. attribute:: subtasks
- A list of :class:`AsyncResult` instances for all of the subtasks.
- """
- def __init__(self, taskset_id, subtasks):
- self.taskset_id = taskset_id
- self.subtasks = subtasks
- def itersubtasks(self):
- """Taskset subtask iterator.
- :returns: an iterator for iterating over the tasksets
- :class:`AsyncResult` objects.
- """
- return (subtask for subtask in self.subtasks)
- def successful(self):
- """Was the taskset successful?
- :returns: ``True`` if all of the tasks in the taskset finished
- successfully (i.e. did not raise an exception).
- """
- return all((subtask.successful()
- for subtask in self.itersubtasks()))
- def failed(self):
- """Did the taskset fail?
- :returns: ``True`` if any of the tasks in the taskset failed.
- (i.e., raised an exception)
- """
- return any((not subtask.successful()
- for subtask in self.itersubtasks()))
- def waiting(self):
- """Is the taskset waiting?
- :returns: ``True`` if any of the tasks in the taskset is still
- waiting for execution.
- """
- return any((not subtask.ready()
- for subtask in self.itersubtasks()))
- def ready(self):
- """Is the task ready?
- :returns: ``True`` if all of the tasks in the taskset has been
- executed.
- """
- return all((subtask.ready()
- for subtask in self.itersubtasks()))
- def completed_count(self):
- """Task completion count.
- :returns: the number of tasks completed.
- """
- return sum(imap(int, (subtask.successful()
- for subtask in self.itersubtasks())))
- def __iter__(self):
- """``iter(res)`` -> ``res.iterate()``."""
- return self.iterate()
- def iterate(self):
- """Iterate over the return values of the tasks as they finish
- one by one.
- :raises: The exception if any of the tasks raised an exception.
- """
- results = dict((subtask.task_id, AsyncResult(subtask.task_id))
- for subtask in self.subtasks)
- while results:
- for task_id, pending_result in results.items():
- if pending_result.status == "SUCCESS":
- del(results[task_id])
- yield pending_result.result
- elif pending_result.status == "FAILURE":
- raise pending_result.result
- def join(self, timeout=None):
- """Gather the results for all of the tasks in the taskset,
- and return a list with them ordered by the order of which they
- were called.
- :keyword timeout: The time in seconds, how long
- it will wait for results, before the operation times out.
- :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
- and the operation takes longer than ``timeout`` seconds.
- If any of the tasks raises an exception, the exception
- will be reraised by :meth:`join`.
- :returns: list of return values for all tasks in the taskset.
- """
- time_start = time.time()
- def on_timeout():
- raise TimeoutError("The operation timed out.")
- results = PositionQueue(length=self.total)
- while True:
- for position, pending_result in enumerate(self.subtasks):
- if pending_result.status == "SUCCESS":
- results[position] = pending_result.result
- elif pending_result.status == "FAILURE":
- raise pending_result.result
- if results.full():
- # Make list copy, so the returned type is not a position
- # queue.
- return list(results)
- else:
- if timeout is not None and \
- time.time() >= time_start + timeout:
- on_timeout()
- @property
- def total(self):
- """The total number of tasks in the :class:`celery.task.TaskSet`."""
- return len(self.subtasks)
- class EagerResult(BaseAsyncResult):
- """Result that we know has already been executed. """
- TimeoutError = TimeoutError
- def __init__(self, task_id, ret_value, status, traceback=None):
- self.task_id = task_id
- self._result = ret_value
- self._status = status
- self._traceback = traceback
- def successful(self):
- """Returns ``True`` if the task executed without failure."""
- return self.status == "SUCCESS"
- def ready(self):
- """Returns ``True`` if the task has been executed."""
- return True
- def wait(self, timeout=None):
- """Wait until the task has been executed and return its result."""
- if self.status == "SUCCESS":
- return self.result
- elif self.status == "FAILURE":
- raise self.result.exception
- @property
- def result(self):
- """The tasks return value"""
- return self._result
- @property
- def status(self):
- """The tasks status"""
- return self._status
- @property
- def traceback(self):
- """The traceback if the task failed."""
- return self._traceback
- def __repr__(self):
- return "<EagerResult: %s>" % self.task_id
|