result.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. """
  2. Asynchronous result types.
  3. """
  4. from celery.backends import default_backend
  5. from celery.datastructures import PositionQueue
  6. from celery.exceptions import TimeoutError
  7. from itertools import imap
  8. import time
  9. class BaseAsyncResult(object):
  10. """Base class for pending result, supports custom
  11. task meta :attr:`backend`
  12. :param task_id: see :attr:`task_id`.
  13. :param backend: see :attr:`backend`.
  14. .. attribute:: task_id
  15. The unique identifier for this task.
  16. .. attribute:: backend
  17. The task result backend used.
  18. """
  19. TimeoutError = TimeoutError
  20. def __init__(self, task_id, backend):
  21. self.task_id = task_id
  22. self.backend = backend
  23. def is_done(self):
  24. """Returns ``True`` if the task executed successfully.
  25. :rtype: bool
  26. """
  27. return self.backend.is_done(self.task_id)
  28. def get(self):
  29. """Alias to :meth:`wait`."""
  30. return self.wait()
  31. def wait(self, timeout=None):
  32. """Wait for task, and return the result when it arrives.
  33. :keyword timeout: How long to wait in seconds, before the
  34. operation times out.
  35. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  36. and the result does not arrive within ``timeout`` seconds.
  37. If the remote call raised an exception then that
  38. exception will be re-raised.
  39. """
  40. return self.backend.wait_for(self.task_id, timeout=timeout)
  41. def ready(self):
  42. """Returns ``True`` if the task executed successfully, or raised
  43. an exception. If the task is still pending, or is waiting for retry
  44. then ``False`` is returned.
  45. :rtype: bool
  46. """
  47. status = self.backend.get_status(self.task_id)
  48. return status not in ["PENDING", "RETRY"]
  49. def successful(self):
  50. """Alias to :meth:`is_done`."""
  51. return self.is_done()
  52. def __str__(self):
  53. """``str(self)`` -> ``self.task_id``"""
  54. return self.task_id
  55. def __repr__(self):
  56. return "<AsyncResult: %s>" % self.task_id
  57. @property
  58. def result(self):
  59. """When the task has been executed, this contains the return value.
  60. If the task raised an exception, this will be the exception instance.
  61. """
  62. if self.status == "DONE" or self.status == "FAILURE":
  63. return self.backend.get_result(self.task_id)
  64. return None
  65. @property
  66. def traceback(self):
  67. """Get the traceback of a failed task."""
  68. return self.backend.get_traceback(self.task_id)
  69. @property
  70. def status(self):
  71. """The current status of the task.
  72. Can be one of the following:
  73. *PENDING*
  74. The task is waiting for execution.
  75. *RETRY*
  76. The task is to be retried, possibly because of failure.
  77. *FAILURE*
  78. The task raised an exception, or has been retried more times
  79. than its limit. The :attr:`result` attribute contains the
  80. exception raised.
  81. *DONE*
  82. The task executed successfully. The :attr:`result` attribute
  83. contains the resulting value.
  84. """
  85. return self.backend.get_status(self.task_id)
  86. class AsyncResult(BaseAsyncResult):
  87. """Pending task result using the default backend.
  88. :param task_id: see :attr:`task_id`.
  89. .. attribute:: task_id
  90. The unique identifier for this task.
  91. .. attribute:: backend
  92. Instance of :class:`celery.backends.DefaultBackend`.
  93. """
  94. def __init__(self, task_id):
  95. super(AsyncResult, self).__init__(task_id, backend=default_backend)
  96. class TaskSetResult(object):
  97. """Working with :class:`celery.task.TaskSet` results.
  98. An instance of this class is returned by
  99. :meth:`celery.task.TaskSet.run()`. It lets you inspect the status and
  100. return values of the taskset as a single entity.
  101. :option taskset_id: see :attr:`taskset_id`.
  102. :option subtasks: see :attr:`subtasks`.
  103. .. attribute:: taskset_id
  104. The UUID of the taskset itself.
  105. .. attribute:: subtasks
  106. A list of :class:`AsyncResult` instances for all of the subtasks.
  107. """
  108. def __init__(self, taskset_id, subtasks):
  109. self.taskset_id = taskset_id
  110. self.subtasks = subtasks
  111. def itersubtasks(self):
  112. """Taskset subtask iterator.
  113. :returns: an iterator for iterating over the tasksets
  114. :class:`AsyncResult` objects.
  115. """
  116. return (subtask for subtask in self.subtasks)
  117. def successful(self):
  118. """Was the taskset successful?
  119. :returns: ``True`` if all of the tasks in the taskset finished
  120. successfully (i.e. did not raise an exception).
  121. """
  122. return all((subtask.successful()
  123. for subtask in self.itersubtasks()))
  124. def failed(self):
  125. """Did the taskset fail?
  126. :returns: ``True`` if any of the tasks in the taskset failed.
  127. (i.e., raised an exception)
  128. """
  129. return any((not subtask.successful()
  130. for subtask in self.itersubtasks()))
  131. def waiting(self):
  132. """Is the taskset waiting?
  133. :returns: ``True`` if any of the tasks in the taskset is still
  134. waiting for execution.
  135. """
  136. return any((not subtask.ready()
  137. for subtask in self.itersubtasks()))
  138. def ready(self):
  139. """Is the task ready?
  140. :returns: ``True`` if all of the tasks in the taskset has been
  141. executed.
  142. """
  143. return all((subtask.ready()
  144. for subtask in self.itersubtasks()))
  145. def completed_count(self):
  146. """Task completion count.
  147. :returns: the number of tasks completed.
  148. """
  149. return sum(imap(int, (subtask.successful()
  150. for subtask in self.itersubtasks())))
  151. def __iter__(self):
  152. """``iter(res)`` -> ``res.iterate()``."""
  153. return self.iterate()
  154. def iterate(self):
  155. """Iterate over the return values of the tasks as they finish
  156. one by one.
  157. :raises: The exception if any of the tasks raised an exception.
  158. """
  159. results = dict((subtask.task_id, AsyncResult(subtask.task_id))
  160. for subtask in self.subtasks)
  161. while results:
  162. for task_id, pending_result in results.items():
  163. if pending_result.status == "DONE":
  164. del(results[task_id])
  165. yield pending_result.result
  166. elif pending_result.status == "FAILURE":
  167. raise pending_result.result
  168. def join(self, timeout=None):
  169. """Gather the results for all of the tasks in the taskset,
  170. and return a list with them ordered by the order of which they
  171. were called.
  172. :keyword timeout: The time in seconds, how long
  173. it will wait for results, before the operation times out.
  174. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  175. and the operation takes longer than ``timeout`` seconds.
  176. If any of the tasks raises an exception, the exception
  177. will be reraised by :meth:`join`.
  178. :returns: list of return values for all tasks in the taskset.
  179. """
  180. time_start = time.time()
  181. def on_timeout():
  182. raise TimeoutError("The operation timed out.")
  183. results = PositionQueue(length=self.total)
  184. while True:
  185. for position, pending_result in enumerate(self.subtasks):
  186. if pending_result.status == "DONE":
  187. results[position] = pending_result.result
  188. elif pending_result.status == "FAILURE":
  189. raise pending_result.result
  190. if results.full():
  191. # Make list copy, so the returned type is not a position
  192. # queue.
  193. return list(results)
  194. else:
  195. if timeout is not None and \
  196. time.time() >= time_start + timeout:
  197. on_timeout()
  198. @property
  199. def total(self):
  200. """The total number of tasks in the :class:`celery.task.TaskSet`."""
  201. return len(self.subtasks)
  202. class EagerResult(BaseAsyncResult):
  203. """Result that we know has already been executed. """
  204. TimeoutError = TimeoutError
  205. def __init__(self, task_id, ret_value, status, traceback=None):
  206. self.task_id = task_id
  207. self._result = ret_value
  208. self._status = status
  209. self._traceback = traceback
  210. def is_done(self):
  211. """Returns ``True`` if the task executed without failure."""
  212. return self.status == "DONE"
  213. def is_ready(self):
  214. """Returns ``True`` if the task has been executed."""
  215. return True
  216. def wait(self, timeout=None):
  217. """Wait until the task has been executed and return its result."""
  218. if self.status == "DONE":
  219. return self.result
  220. elif self.status == "FAILURE":
  221. raise self.result
  222. @property
  223. def result(self):
  224. """The tasks return value"""
  225. return self._result
  226. @property
  227. def status(self):
  228. """The tasks status"""
  229. return self._status
  230. @property
  231. def traceback(self):
  232. """The traceback if the task failed."""
  233. return self._traceback
  234. def __repr__(self):
  235. return "<EagerResult: %s>" % self.task_id