result.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. """
  2. Asynchronous result types.
  3. """
  4. from celery.backends import default_backend
  5. from celery.datastructures import PositionQueue
  6. from celery.timer import TimeoutTimer
  7. from itertools import imap
  8. class BaseAsyncResult(object):
  9. """Base class for pending result, supports custom
  10. task meta :attr:`backend`
  11. :param task_id: see :attr:`task_id`.
  12. :param backend: see :attr:`backend`.
  13. .. attribute:: task_id
  14. The unique identifier for this task.
  15. .. attribute:: backend
  16. The task result backend used.
  17. """
  18. def __init__(self, task_id, backend):
  19. self.task_id = task_id
  20. self.backend = backend
  21. def is_done(self):
  22. """Returns ``True`` if the task executed successfully.
  23. :rtype: bool
  24. """
  25. return self.backend.is_done(self.task_id)
  26. def get(self):
  27. """Alias to :meth:`wait`."""
  28. return self.wait()
  29. def wait(self, timeout=None):
  30. """Wait for task, and return the result when it arrives.
  31. :keyword timeout: How long to wait in seconds, before the
  32. operation times out.
  33. :raises celery.timer.TimeoutError: if ``timeout`` is not ``None`` and
  34. the result does not arrive within ``timeout`` seconds.
  35. If the remote call raised an exception then that
  36. exception will be re-raised.
  37. """
  38. return self.backend.wait_for(self.task_id, timeout=timeout)
  39. def ready(self):
  40. """Returns ``True`` if the task executed successfully, or raised
  41. an exception. If the task is still pending, or is waiting for retry
  42. then ``False`` is returned.
  43. :rtype: bool
  44. """
  45. status = self.backend.get_status(self.task_id)
  46. return status != "PENDING" or status != "RETRY"
  47. def successful(self):
  48. """Alias to :meth:`is_done`."""
  49. return self.is_done()
  50. def __str__(self):
  51. """``str(self)`` -> ``self.task_id``"""
  52. return self.task_id
  53. def __repr__(self):
  54. return "<AsyncResult: %s>" % self.task_id
  55. @property
  56. def result(self):
  57. """When the task has been executed, this contains the return value.
  58. If the task raised an exception, this will be the exception instance.
  59. """
  60. if self.status == "DONE" or self.status == "FAILURE":
  61. return self.backend.get_result(self.task_id)
  62. return None
  63. @property
  64. def status(self):
  65. """The current status of the task.
  66. Can be one of the following:
  67. *PENDING*
  68. The task is waiting for execution.
  69. *RETRY*
  70. The task is to be retried, possibly because of failure.
  71. *FAILURE*
  72. The task raised an exception, or has been retried more times
  73. than its limit. The :attr:`result` attribute contains the
  74. exception raised.
  75. *DONE*
  76. The task executed successfully. The :attr:`result` attribute
  77. contains the resulting value.
  78. """
  79. return self.backend.get_status(self.task_id)
  80. class AsyncResult(BaseAsyncResult):
  81. """Pending task result using the default backend.
  82. :param task_id: see :attr:`task_id`.
  83. .. attribute:: task_id
  84. The unique identifier for this task.
  85. .. attribute:: backend
  86. Instance of :class:`celery.backends.DefaultBackend`.
  87. """
  88. def __init__(self, task_id):
  89. super(AsyncResult, self).__init__(task_id, backend=default_backend)
  90. class TaskSetResult(object):
  91. """Working with :class:`celery.task.TaskSet` results.
  92. An instance of this class is returned by :meth:`celery.task.TaskSet.run()`.
  93. It lets you inspect the status and return values of a taskset as a
  94. single entity.
  95. :option taskset_id: see :attr:`taskset_id`.
  96. :option subtask_ids: see :attr:`subtask_ids`.
  97. .. attribute:: taskset_id
  98. The UUID of the taskset itself.
  99. .. attribute:: subtask_ids
  100. The list of task UUID's for all of the subtasks.
  101. .. attribute:: subtasks
  102. A list of :class:`AsyncResult`` instances for all of the subtasks.
  103. """
  104. def __init__(self, taskset_id, subtask_ids):
  105. self.taskset_id = taskset_id
  106. self.subtask_ids = subtask_ids
  107. self.subtasks = map(AsyncResult, self.subtask_ids)
  108. def itersubtasks(self):
  109. """Taskset subtask iterator.
  110. :returns: an iterator for iterating over the tasksets
  111. :class:`AsyncResult` objects.
  112. """
  113. return (subtask for subtask in self.subtasks)
  114. def successful(self):
  115. """Was the taskset successful?
  116. :returns: ``True`` if all of the tasks in the taskset finished
  117. successfully (i.e. did not raise an exception).
  118. """
  119. return all((subtask.successful()
  120. for subtask in self.itersubtasks()))
  121. def failed(self):
  122. """Did the taskset fail?
  123. :returns: ``True`` if any of the tasks in the taskset failed.
  124. (i.e., raised an exception)
  125. """
  126. return any((not subtask.successful()
  127. for subtask in self.itersubtasks()))
  128. def waiting(self):
  129. """Is the taskset waiting?
  130. :returns: ``True`` if any of the tasks in the taskset is still
  131. waiting for execution.
  132. """
  133. return any((not subtask.ready()
  134. for subtask in self.itersubtasks()))
  135. def ready(self):
  136. """Is the task readyu?
  137. :returns: ``True`` if all of the tasks in the taskset has been
  138. executed.
  139. """
  140. return all((subtask.ready()
  141. for subtask in self.itersubtasks()))
  142. def completed_count(self):
  143. """Task completion count.
  144. :returns: the number of tasks completed.
  145. """
  146. return sum(imap(int, (subtask.successful()
  147. for subtask in self.itersubtasks())))
  148. def __iter__(self):
  149. """``iter(res)`` -> ``res.iterate()``."""
  150. return self.iterate()
  151. def iterate(self):
  152. """Iterate over the return values of the tasks as they finish
  153. one by one.
  154. :raises: The exception if any of the tasks raised an exception.
  155. """
  156. results = dict([(task_id, AsyncResult(task_id))
  157. for task_id in self.subtask_ids])
  158. while results:
  159. for task_id, pending_result in results.items():
  160. if pending_result.status == "DONE":
  161. del(results[task_id])
  162. yield pending_result.result
  163. elif pending_result.status == "FAILURE":
  164. raise pending_result.result
  165. def join(self, timeout=None):
  166. """Gather the results for all of the tasks in the taskset,
  167. and return a list with them ordered by the order of which they
  168. were called.
  169. :keyword timeout: The time in seconds, how long
  170. it will wait for results, before the operation times out.
  171. :raises celery.timer.TimeoutError: if ``timeout`` is not ``None``
  172. and the operation takes longer than ``timeout`` seconds.
  173. If any of the tasks raises an exception, the exception
  174. will be reraised by :meth:`join`.
  175. :returns: list of return values for all tasks in the taskset.
  176. """
  177. timeout_timer = TimeoutTimer(timeout) # Timeout timer starts here.
  178. results = PositionQueue(length=self.total)
  179. while True:
  180. for position, pending_result in enumerate(self.subtasks):
  181. if pending_result.status == "DONE":
  182. results[position] = pending_result.result
  183. elif pending_result.status == "FAILURE":
  184. raise pending_result.result
  185. if results.full():
  186. # Make list copy, so the returned type is not a position
  187. # queue.
  188. return list(results)
  189. # This raises TimeoutError when timed out.
  190. timeout_timer.tick()
  191. @property
  192. def total(self):
  193. """The total number of tasks in the :class:`celery.task.TaskSet`."""
  194. return len(self.subtasks)