result.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. """
  2. Asynchronous result types.
  3. """
  4. import time
  5. from itertools import imap
  6. from celery.utils import any, all
  7. from celery.backends import default_backend
  8. from celery.messaging import with_connection
  9. from celery.exceptions import TimeoutError
  10. from celery.datastructures import PositionQueue
  11. class BaseAsyncResult(object):
  12. """Base class for pending result, supports custom task result backend.
  13. :param task_id: see :attr:`task_id`.
  14. :param backend: see :attr:`backend`.
  15. .. attribute:: task_id
  16. The unique identifier for this task.
  17. .. attribute:: backend
  18. The task result backend used.
  19. """
  20. TimeoutError = TimeoutError
  21. def __init__(self, task_id, backend):
  22. self.task_id = task_id
  23. self.backend = backend
  24. @with_connection
  25. def revoke(self, connection=None, connect_timeout=None):
  26. """Send revoke signal to all workers.
  27. The workers will ignore the task if received.
  28. """
  29. from celery.task import control
  30. control.revoke(self.task_id)
  31. def get(self, timeout=None):
  32. """Alias to :meth:`wait`."""
  33. return self.wait(timeout=timeout)
  34. def wait(self, timeout=None):
  35. """Wait for task, and return the result when it arrives.
  36. :keyword timeout: How long to wait, in seconds, before the
  37. operation times out.
  38. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  39. and the result does not arrive within ``timeout`` seconds.
  40. If the remote call raised an exception then that
  41. exception will be re-raised.
  42. """
  43. return self.backend.wait_for(self.task_id, timeout=timeout)
  44. def ready(self):
  45. """Returns ``True`` if the task executed successfully, or raised
  46. an exception. If the task is still pending, or is waiting for retry
  47. then ``False`` is returned.
  48. :rtype: bool
  49. """
  50. status = self.backend.get_status(self.task_id)
  51. return status not in self.backend.UNREADY_STATES
  52. def successful(self):
  53. """Returns ``True`` if the task executed successfully.
  54. :rtype: bool
  55. """
  56. return self.backend.is_successful(self.task_id)
  57. def __str__(self):
  58. """``str(self)`` -> ``self.task_id``"""
  59. return self.task_id
  60. def __repr__(self):
  61. return "<AsyncResult: %s>" % self.task_id
  62. @property
  63. def result(self):
  64. """When the task has been executed, this contains the return value.
  65. If the task raised an exception, this will be the exception instance.
  66. """
  67. return self.backend.get_result(self.task_id)
  68. @property
  69. def traceback(self):
  70. """Get the traceback of a failed task."""
  71. return self.backend.get_traceback(self.task_id)
  72. @property
  73. def status(self):
  74. """The current status of the task.
  75. Can be one of the following:
  76. *PENDING*
  77. The task is waiting for execution.
  78. *RETRY*
  79. The task is to be retried, possibly because of failure.
  80. *FAILURE*
  81. The task raised an exception, or has been retried more times
  82. than its limit. The :attr:`result` attribute contains the
  83. exception raised.
  84. *SUCCESS*
  85. The task executed successfully. The :attr:`result` attribute
  86. contains the resulting value.
  87. """
  88. return self.backend.get_status(self.task_id)
  89. class AsyncResult(BaseAsyncResult):
  90. """Pending task result using the default backend.
  91. :param task_id: see :attr:`task_id`.
  92. .. attribute:: task_id
  93. The unique identifier for this task.
  94. .. attribute:: backend
  95. Instance of :class:`celery.backends.DefaultBackend`.
  96. """
  97. def __init__(self, task_id):
  98. super(AsyncResult, self).__init__(task_id, backend=default_backend)
  99. class TaskSetResult(object):
  100. """Working with :class:`celery.task.TaskSet` results.
  101. An instance of this class is returned by
  102. :meth:`celery.task.TaskSet.run()`. It lets you inspect the status and
  103. return values of the taskset as a single entity.
  104. :option taskset_id: see :attr:`taskset_id`.
  105. :option subtasks: see :attr:`subtasks`.
  106. .. attribute:: taskset_id
  107. The UUID of the taskset itself.
  108. .. attribute:: subtasks
  109. A list of :class:`AsyncResult` instances for all of the subtasks.
  110. """
  111. def __init__(self, taskset_id, subtasks):
  112. self.taskset_id = taskset_id
  113. self.subtasks = subtasks
  114. def itersubtasks(self):
  115. """Taskset subtask iterator.
  116. :returns: an iterator for iterating over the tasksets
  117. :class:`AsyncResult` objects.
  118. """
  119. return (subtask for subtask in self.subtasks)
  120. def successful(self):
  121. """Was the taskset successful?
  122. :returns: ``True`` if all of the tasks in the taskset finished
  123. successfully (i.e. did not raise an exception).
  124. """
  125. return all((subtask.successful()
  126. for subtask in self.itersubtasks()))
  127. def failed(self):
  128. """Did the taskset fail?
  129. :returns: ``True`` if any of the tasks in the taskset failed.
  130. (i.e., raised an exception)
  131. """
  132. return any((not subtask.successful()
  133. for subtask in self.itersubtasks()))
  134. def waiting(self):
  135. """Is the taskset waiting?
  136. :returns: ``True`` if any of the tasks in the taskset is still
  137. waiting for execution.
  138. """
  139. return any((not subtask.ready()
  140. for subtask in self.itersubtasks()))
  141. def ready(self):
  142. """Is the task ready?
  143. :returns: ``True`` if all of the tasks in the taskset has been
  144. executed.
  145. """
  146. return all((subtask.ready()
  147. for subtask in self.itersubtasks()))
  148. def completed_count(self):
  149. """Task completion count.
  150. :returns: the number of tasks completed.
  151. """
  152. return sum(imap(int, (subtask.successful()
  153. for subtask in self.itersubtasks())))
  154. @with_connection
  155. def revoke(self, connection=None, connect_timeout=None):
  156. for subtask in self.subtasks:
  157. subtask.revoke(connection=connection)
  158. def __iter__(self):
  159. """``iter(res)`` -> ``res.iterate()``."""
  160. return self.iterate()
  161. def iterate(self):
  162. """Iterate over the return values of the tasks as they finish
  163. one by one.
  164. :raises: The exception if any of the tasks raised an exception.
  165. """
  166. results = dict((subtask.task_id, subtask.__class__(subtask.task_id))
  167. for subtask in self.subtasks)
  168. while results:
  169. for task_id, pending_result in results.items():
  170. if pending_result.status == "SUCCESS":
  171. results.pop(task_id, None)
  172. yield pending_result.result
  173. elif pending_result.status == "FAILURE":
  174. raise pending_result.result
  175. def join(self, timeout=None):
  176. """Gather the results for all of the tasks in the taskset,
  177. and return a list with them ordered by the order of which they
  178. were called.
  179. :keyword timeout: The time in seconds, how long
  180. it will wait for results, before the operation times out.
  181. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  182. and the operation takes longer than ``timeout`` seconds.
  183. If any of the tasks raises an exception, the exception
  184. will be reraised by :meth:`join`.
  185. :returns: list of return values for all tasks in the taskset.
  186. """
  187. time_start = time.time()
  188. def on_timeout():
  189. raise TimeoutError("The operation timed out.")
  190. results = PositionQueue(length=self.total)
  191. while True:
  192. for position, pending_result in enumerate(self.subtasks):
  193. if pending_result.status == "SUCCESS":
  194. results[position] = pending_result.result
  195. elif pending_result.status == "FAILURE":
  196. raise pending_result.result
  197. if results.full():
  198. # Make list copy, so the returned type is not a position
  199. # queue.
  200. return list(results)
  201. else:
  202. if timeout is not None and \
  203. time.time() >= time_start + timeout:
  204. on_timeout()
  205. @property
  206. def total(self):
  207. """The total number of tasks in the :class:`celery.task.TaskSet`."""
  208. return len(self.subtasks)
  209. class EagerResult(BaseAsyncResult):
  210. """Result that we know has already been executed. """
  211. TimeoutError = TimeoutError
  212. def __init__(self, task_id, ret_value, status, traceback=None):
  213. self.task_id = task_id
  214. self._result = ret_value
  215. self._status = status
  216. self._traceback = traceback
  217. def successful(self):
  218. """Returns ``True`` if the task executed without failure."""
  219. return self.status == "SUCCESS"
  220. def ready(self):
  221. """Returns ``True`` if the task has been executed."""
  222. return True
  223. def wait(self, timeout=None):
  224. """Wait until the task has been executed and return its result."""
  225. if self.status == "SUCCESS":
  226. return self.result
  227. elif self.status == "FAILURE":
  228. raise self.result.exception
  229. def revoke(self):
  230. pass
  231. @property
  232. def result(self):
  233. """The tasks return value"""
  234. return self._result
  235. @property
  236. def status(self):
  237. """The tasks status"""
  238. return self._status
  239. @property
  240. def traceback(self):
  241. """The traceback if the task failed."""
  242. return self._traceback
  243. def __repr__(self):
  244. return "<EagerResult: %s>" % self.task_id