result.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. from __future__ import generators
  2. """
  3. Asynchronous result types.
  4. """
  5. import time
  6. from itertools import imap
  7. from celery import states
  8. from celery.utils import any, all
  9. from celery.backends import default_backend
  10. from celery.messaging import with_connection
  11. from celery.exceptions import TimeoutError
  12. from celery.datastructures import PositionQueue
  13. class BaseAsyncResult(object):
  14. """Base class for pending result, supports custom task result backend.
  15. :param task_id: see :attr:`task_id`.
  16. :param backend: see :attr:`backend`.
  17. .. attribute:: task_id
  18. The unique identifier for this task.
  19. .. attribute:: backend
  20. The task result backend used.
  21. """
  22. TimeoutError = TimeoutError
  23. def __init__(self, task_id, backend):
  24. self.task_id = task_id
  25. self.backend = backend
  26. @with_connection
  27. def revoke(self, connection=None, connect_timeout=None):
  28. """Send revoke signal to all workers.
  29. The workers will ignore the task if received.
  30. """
  31. from celery.task import control
  32. control.revoke(self.task_id)
  33. def get(self, timeout=None):
  34. """Alias to :meth:`wait`."""
  35. return self.wait(timeout=timeout)
  36. def wait(self, timeout=None):
  37. """Wait for task, and return the result when it arrives.
  38. :keyword timeout: How long to wait, in seconds, before the
  39. operation times out.
  40. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  41. and the result does not arrive within ``timeout`` seconds.
  42. If the remote call raised an exception then that
  43. exception will be re-raised.
  44. """
  45. return self.backend.wait_for(self.task_id, timeout=timeout)
  46. def ready(self):
  47. """Returns ``True`` if the task executed successfully, or raised
  48. an exception. If the task is still pending, or is waiting for retry
  49. then ``False`` is returned.
  50. :rtype: bool
  51. """
  52. status = self.backend.get_status(self.task_id)
  53. return status not in self.backend.UNREADY_STATES
  54. def successful(self):
  55. """Returns ``True`` if the task executed successfully.
  56. :rtype: bool
  57. """
  58. return self.backend.is_successful(self.task_id)
  59. def __str__(self):
  60. """``str(self)`` -> ``self.task_id``"""
  61. return self.task_id
  62. def __hash__(self):
  63. return hash(self.task_id)
  64. def __repr__(self):
  65. return "<AsyncResult: %s>" % self.task_id
  66. def __eq__(self, other):
  67. if isinstance(other, self.__class__):
  68. return self.task_id == other.task_id
  69. return self == other
  70. @property
  71. def result(self):
  72. """When the task has been executed, this contains the return value.
  73. If the task raised an exception, this will be the exception instance.
  74. """
  75. return self.backend.get_result(self.task_id)
  76. @property
  77. def traceback(self):
  78. """Get the traceback of a failed task."""
  79. return self.backend.get_traceback(self.task_id)
  80. @property
  81. def status(self):
  82. """The current status of the task.
  83. Can be one of the following:
  84. *PENDING*
  85. The task is waiting for execution.
  86. *RETRY*
  87. The task is to be retried, possibly because of failure.
  88. *FAILURE*
  89. The task raised an exception, or has been retried more times
  90. than its limit. The :attr:`result` attribute contains the
  91. exception raised.
  92. *SUCCESS*
  93. The task executed successfully. The :attr:`result` attribute
  94. contains the resulting value.
  95. """
  96. return self.backend.get_status(self.task_id)
  97. class AsyncResult(BaseAsyncResult):
  98. """Pending task result using the default backend.
  99. :param task_id: see :attr:`task_id`.
  100. .. attribute:: task_id
  101. The unique identifier for this task.
  102. .. attribute:: backend
  103. Instance of :class:`celery.backends.DefaultBackend`.
  104. """
  105. def __init__(self, task_id):
  106. super(AsyncResult, self).__init__(task_id, backend=default_backend)
  107. class TaskSetResult(object):
  108. """Working with :class:`celery.task.TaskSet` results.
  109. An instance of this class is returned by
  110. :meth:`celery.task.TaskSet.run()`. It lets you inspect the status and
  111. return values of the taskset as a single entity.
  112. :option taskset_id: see :attr:`taskset_id`.
  113. :option subtasks: see :attr:`subtasks`.
  114. .. attribute:: taskset_id
  115. The UUID of the taskset itself.
  116. .. attribute:: subtasks
  117. A list of :class:`AsyncResult` instances for all of the subtasks.
  118. """
  119. def __init__(self, taskset_id, subtasks):
  120. self.taskset_id = taskset_id
  121. self.subtasks = subtasks
  122. def itersubtasks(self):
  123. """Taskset subtask iterator.
  124. :returns: an iterator for iterating over the tasksets
  125. :class:`AsyncResult` objects.
  126. """
  127. return (subtask for subtask in self.subtasks)
  128. def successful(self):
  129. """Was the taskset successful?
  130. :returns: ``True`` if all of the tasks in the taskset finished
  131. successfully (i.e. did not raise an exception).
  132. """
  133. return all((subtask.successful()
  134. for subtask in self.itersubtasks()))
  135. def failed(self):
  136. """Did the taskset fail?
  137. :returns: ``True`` if any of the tasks in the taskset failed.
  138. (i.e., raised an exception)
  139. """
  140. return any((not subtask.successful()
  141. for subtask in self.itersubtasks()))
  142. def waiting(self):
  143. """Is the taskset waiting?
  144. :returns: ``True`` if any of the tasks in the taskset is still
  145. waiting for execution.
  146. """
  147. return any((not subtask.ready()
  148. for subtask in self.itersubtasks()))
  149. def ready(self):
  150. """Is the task ready?
  151. :returns: ``True`` if all of the tasks in the taskset has been
  152. executed.
  153. """
  154. return all((subtask.ready()
  155. for subtask in self.itersubtasks()))
  156. def completed_count(self):
  157. """Task completion count.
  158. :returns: the number of tasks completed.
  159. """
  160. return sum(imap(int, (subtask.successful()
  161. for subtask in self.itersubtasks())))
  162. @with_connection
  163. def revoke(self, connection=None, connect_timeout=None):
  164. for subtask in self.subtasks:
  165. subtask.revoke(connection=connection)
  166. def __iter__(self):
  167. """``iter(res)`` -> ``res.iterate()``."""
  168. return self.iterate()
  169. def iterate(self):
  170. """Iterate over the return values of the tasks as they finish
  171. one by one.
  172. :raises: The exception if any of the tasks raised an exception.
  173. """
  174. results = dict((subtask.task_id, subtask.__class__(subtask.task_id))
  175. for subtask in self.subtasks)
  176. while results:
  177. for task_id, pending_result in results.items():
  178. if pending_result.status == states.SUCCESS:
  179. results.pop(task_id, None)
  180. yield pending_result.result
  181. elif pending_result.status == states.FAILURE:
  182. raise pending_result.result
  183. def join(self, timeout=None):
  184. """Gather the results for all of the tasks in the taskset,
  185. and return a list with them ordered by the order of which they
  186. were called.
  187. :keyword timeout: The time in seconds, how long
  188. it will wait for results, before the operation times out.
  189. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  190. and the operation takes longer than ``timeout`` seconds.
  191. If any of the tasks raises an exception, the exception
  192. will be reraised by :meth:`join`.
  193. :returns: list of return values for all tasks in the taskset.
  194. """
  195. time_start = time.time()
  196. def on_timeout():
  197. raise TimeoutError("The operation timed out.")
  198. results = PositionQueue(length=self.total)
  199. while True:
  200. for position, pending_result in enumerate(self.subtasks):
  201. if pending_result.status == states.SUCCESS:
  202. results[position] = pending_result.result
  203. elif pending_result.status == states.FAILURE:
  204. raise pending_result.result
  205. if results.full():
  206. # Make list copy, so the returned type is not a position
  207. # queue.
  208. return list(results)
  209. else:
  210. if timeout is not None and \
  211. time.time() >= time_start + timeout:
  212. on_timeout()
  213. def save(self, backend=default_backend):
  214. """Save taskset result for later retrieval using :meth:`restore`.
  215. Example:
  216. >>> result.save()
  217. >>> result = TaskSetResult.restore(task_id)
  218. """
  219. backend.save_taskset(self.taskset_id, self)
  220. @classmethod
  221. def restore(self, taskset_id, backend=default_backend):
  222. """Restore previously saved taskset result."""
  223. return backend.restore_taskset(taskset_id)
  224. @property
  225. def total(self):
  226. """The total number of tasks in the :class:`celery.task.TaskSet`."""
  227. return len(self.subtasks)
  228. class EagerResult(BaseAsyncResult):
  229. """Result that we know has already been executed. """
  230. TimeoutError = TimeoutError
  231. def __init__(self, task_id, ret_value, status, traceback=None):
  232. self.task_id = task_id
  233. self._result = ret_value
  234. self._status = status
  235. self._traceback = traceback
  236. def successful(self):
  237. """Returns ``True`` if the task executed without failure."""
  238. return self.status == states.SUCCESS
  239. def ready(self):
  240. """Returns ``True`` if the task has been executed."""
  241. return True
  242. def wait(self, timeout=None):
  243. """Wait until the task has been executed and return its result."""
  244. if self.status == states.SUCCESS:
  245. return self.result
  246. elif self.status == states.FAILURE:
  247. raise self.result.exception
  248. def revoke(self):
  249. pass
  250. @property
  251. def result(self):
  252. """The tasks return value"""
  253. return self._result
  254. @property
  255. def status(self):
  256. """The tasks status"""
  257. return self._status
  258. @property
  259. def traceback(self):
  260. """The traceback if the task failed."""
  261. return self._traceback
  262. def __repr__(self):
  263. return "<EagerResult: %s>" % self.task_id