123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- from __future__ import generators
- import time
- from copy import copy
- from itertools import imap
- from celery import states
- from celery.app import app_or_default
- from celery.datastructures import PositionQueue
- from celery.exceptions import TimeoutError
- from celery.utils import any, all
- class BaseAsyncResult(object):
- """Base class for pending result, supports custom task result backend.
- :param task_id: see :attr:`task_id`.
- :param backend: see :attr:`backend`.
- .. attribute:: task_id
- The unique identifier for this task.
- .. attribute:: backend
- The task result backend used.
- """
- TimeoutError = TimeoutError
- def __init__(self, task_id, backend, app=None):
- self.task_id = task_id
- self.backend = backend
- self.app = app_or_default(app)
- def forget(self):
- """Forget about (and possibly remove the result of) this task."""
- self.backend.forget(self.task_id)
- def revoke(self, connection=None, connect_timeout=None):
- """Send revoke signal to all workers.
- The workers will ignore the task if received.
- """
- self.app.control.revoke(self.task_id, connection=connection,
- connect_timeout=connect_timeout)
- def wait(self, timeout=None):
- """Wait for task, and return the result when it arrives.
- :keyword timeout: How long to wait, in seconds, before the
- operation times out.
- :raises celery.exceptions.TimeoutError: if ``timeout`` is not
- :const:`None` and the result does not arrive within ``timeout``
- seconds.
- If the remote call raised an exception then that
- exception will be re-raised.
- """
- return self.backend.wait_for(self.task_id, timeout=timeout)
- def get(self, timeout=None):
- """Alias to :meth:`wait`."""
- return self.wait(timeout=timeout)
- def ready(self):
- """Returns :const:`True` if the task executed successfully, or raised
- an exception.
- If the task is still running, pending, or is waiting
- for retry then :const:`False` is returned.
- """
- return self.status in self.backend.READY_STATES
- def successful(self):
- """Returns :const:`True` if the task executed successfully."""
- return self.status == states.SUCCESS
- def failed(self):
- """Returns :const:`True` if the task failed by exception."""
- return self.status == states.FAILURE
- def __str__(self):
- """``str(self) -> self.task_id``"""
- return self.task_id
- def __hash__(self):
- """``hash(self) -> hash(self.task_id)``"""
- return hash(self.task_id)
- def __repr__(self):
- return "<AsyncResult: %s>" % self.task_id
- def __eq__(self, other):
- if isinstance(other, self.__class__):
- return self.task_id == other.task_id
- return other == self.task_id
- def __copy__(self):
- return self.__class__(self.task_id, backend=self.backend)
- @property
- def result(self):
- """When the task has been executed, this contains the return value.
- If the task raised an exception, this will be the exception instance.
- """
- return self.backend.get_result(self.task_id)
- @property
- def info(self):
- """Get state metadata.
- Alias to :meth:`result`.
- """
- return self.result
- @property
- def traceback(self):
- """Get the traceback of a failed task."""
- return self.backend.get_traceback(self.task_id)
- @property
- def status(self):
- """Deprecated alias of :attr:`state`."""
- return self.state
- @property
- def state(self):
- """The current status of the task.
- Can be one of the following:
- *PENDING*
- The task is waiting for execution.
- *STARTED*
- The task has been started.
- *RETRY*
- The task is to be retried, possibly because of failure.
- *FAILURE*
- The task raised an exception, or has been retried more times
- than its limit. The :attr:`result` attribute contains the
- exception raised.
- *SUCCESS*
- The task executed successfully. The :attr:`result` attribute
- contains the resulting value.
- """
- return self.backend.get_status(self.task_id)
- class AsyncResult(BaseAsyncResult):
- """Pending task result using the default backend.
- :param task_id: see :attr:`task_id`.
- .. attribute:: task_id
- The unique identifier for this task.
- .. attribute:: backend
- Instance of :class:`celery.backends.DefaultBackend`.
- """
- def __init__(self, task_id, backend=None, app=None):
- app = app_or_default(app)
- backend = backend or app.backend
- super(AsyncResult, self).__init__(task_id, backend, app=app)
- class TaskSetResult(object):
- """Working with :class:`~celery.task.TaskSet` results.
- An instance of this class is returned by
- ``TaskSet``'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
- inspection of the subtasks status and return values as a single entity.
- :option taskset_id: see :attr:`taskset_id`.
- :option subtasks: see :attr:`subtasks`.
- .. attribute:: taskset_id
- The UUID of the taskset itself.
- .. attribute:: subtasks
- A list of :class:`AsyncResult` instances for all of the subtasks.
- """
- def __init__(self, taskset_id, subtasks, app=None):
- self.taskset_id = taskset_id
- self.subtasks = subtasks
- self.app = app_or_default(app)
- def itersubtasks(self):
- """Taskset subtask iterator.
- :returns: an iterator for iterating over the tasksets
- :class:`AsyncResult` objects.
- """
- return (subtask for subtask in self.subtasks)
- def successful(self):
- """Was the taskset successful?
- :returns: :const:`True` if all of the tasks in the taskset finished
- successfully (i.e. did not raise an exception).
- """
- return all(subtask.successful()
- for subtask in self.itersubtasks())
- def failed(self):
- """Did the taskset fail?
- :returns: :const:`True` if any of the tasks in the taskset failed.
- (i.e., raised an exception)
- """
- return any(subtask.failed()
- for subtask in self.itersubtasks())
- def waiting(self):
- """Is the taskset waiting?
- :returns: :const:`True` if any of the tasks in the taskset is still
- waiting for execution.
- """
- return any(not subtask.ready()
- for subtask in self.itersubtasks())
- def ready(self):
- """Is the task ready?
- :returns: :const:`True` if all of the tasks in the taskset has been
- executed.
- """
- return all(subtask.ready()
- for subtask in self.itersubtasks())
- def completed_count(self):
- """Task completion count.
- :returns: the number of tasks completed.
- """
- return sum(imap(int, (subtask.successful()
- for subtask in self.itersubtasks())))
- def forget(self):
- """Forget about (and possible remove the result of) all the tasks
- in this taskset."""
- for subtask in self.subtasks:
- subtask.forget()
- def revoke(self, connection=None, connect_timeout=None):
- def _do_revoke(connection=None, connect_timeout=None):
- for subtask in self.subtasks:
- subtask.revoke(connection=connection)
- return self.app.with_default_connection(_do_revoke)(
- connection=connection, connect_timeout=connect_timeout)
- def __iter__(self):
- """``iter(res)`` -> ``res.iterate()``."""
- return self.iterate()
- def __getitem__(self, index):
- return self.subtasks[index]
- def iterate(self):
- """Iterate over the return values of the tasks as they finish
- one by one.
- :raises: The exception if any of the tasks raised an exception.
- """
- pending = list(self.subtasks)
- results = dict((subtask.task_id, copy(subtask))
- for subtask in self.subtasks)
- while pending:
- for task_id in pending:
- result = results[task_id]
- if result.status == states.SUCCESS:
- try:
- pending.remove(task_id)
- except ValueError:
- pass
- yield result.result
- elif result.status in states.PROPAGATE_STATES:
- raise result.result
- def join(self, timeout=None, propagate=True):
- """Gather the results of all tasks in the taskset,
- and returns a list ordered by the order of the set.
- :keyword timeout: The number of seconds to wait for results
- before the operation times out.
- :keyword propagate: If any of the subtasks raises an exception, the
- exception will be reraised.
- :raises celery.exceptions.TimeoutError: if ``timeout`` is not
- :const:`None` and the operation takes longer than ``timeout``
- seconds.
- :returns: list of return values for all subtasks in order.
- """
- time_start = time.time()
- def on_timeout():
- raise TimeoutError("The operation timed out.")
- results = PositionQueue(length=self.total)
- while True:
- for position, pending_result in enumerate(self.subtasks):
- state = pending_result.state
- if state in states.READY_STATES:
- if propagate and state in states.PROPAGATE_STATES:
- raise pending_result.result
- results[position] = pending_result.result
- if results.full():
- # Make list copy, so the returned type is not a position
- # queue.
- return list(results)
- else:
- if (timeout is not None and
- time.time() >= time_start + timeout):
- on_timeout()
- def save(self, backend=None):
- """Save taskset result for later retrieval using :meth:`restore`.
- Example:
- >>> result.save()
- >>> result = TaskSetResult.restore(task_id)
- """
- if backend is None:
- backend = self.app.backend
- backend.save_taskset(self.taskset_id, self)
- @classmethod
- def restore(self, taskset_id, backend=None):
- """Restore previously saved taskset result."""
- if backend is None:
- backend = self.app.backend
- return backend.restore_taskset(taskset_id)
- @property
- def total(self):
- """The total number of tasks in the :class:`~celery.task.TaskSet`."""
- return len(self.subtasks)
- class EagerResult(BaseAsyncResult):
- """Result that we know has already been executed. """
- TimeoutError = TimeoutError
- def __init__(self, task_id, ret_value, status, traceback=None):
- self.task_id = task_id
- self._result = ret_value
- self._status = status
- self._traceback = traceback
- def successful(self):
- """Returns :const:`True` if the task executed without failure."""
- return self.status == states.SUCCESS
- def ready(self):
- """Returns :const:`True` if the task has been executed."""
- return True
- def wait(self, timeout=None):
- """Wait until the task has been executed and return its result."""
- if self.status == states.SUCCESS:
- return self.result
- elif self.status in states.PROPAGATE_STATES:
- raise self.result
- def revoke(self):
- self._status = states.REVOKED
- @property
- def result(self):
- """The tasks return value"""
- return self._result
- @property
- def status(self):
- """The tasks status (alias to :attr:`state`)."""
- return self._status
- @property
- def state(self):
- """The tasks state."""
- return self._state
- @property
- def traceback(self):
- """The traceback if the task failed."""
- return self._traceback
- def __repr__(self):
- return "<EagerResult: %s>" % self.task_id
|