result.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import states
  6. from celery.backends import default_backend
  7. from celery.datastructures import PositionQueue
  8. from celery.exceptions import TimeoutError
  9. from celery.messaging import with_connection
  10. from celery.utils import any, all
  11. class BaseAsyncResult(object):
  12. """Base class for pending result, supports custom task result backend.
  13. :param task_id: see :attr:`task_id`.
  14. :param backend: see :attr:`backend`.
  15. .. attribute:: task_id
  16. The unique identifier for this task.
  17. .. attribute:: backend
  18. The task result backend used.
  19. """
  20. TimeoutError = TimeoutError
  21. def __init__(self, task_id, backend):
  22. self.task_id = task_id
  23. self.backend = backend
  24. def forget(self):
  25. """Forget about (and possibly remove the result of) this task."""
  26. self.backend.forget(self.task_id)
  27. def revoke(self, connection=None, connect_timeout=None):
  28. """Send revoke signal to all workers.
  29. The workers will ignore the task if received.
  30. """
  31. from celery.task import control
  32. control.revoke(self.task_id, connection=connection,
  33. connect_timeout=connect_timeout)
  34. def wait(self, timeout=None):
  35. """Wait for task, and return the result when it arrives.
  36. :keyword timeout: How long to wait, in seconds, before the
  37. operation times out.
  38. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  39. :const:`None` and the result does not arrive within ``timeout``
  40. seconds.
  41. If the remote call raised an exception then that
  42. exception will be re-raised.
  43. """
  44. return self.backend.wait_for(self.task_id, timeout=timeout)
  45. def get(self, timeout=None):
  46. """Alias to :meth:`wait`."""
  47. return self.wait(timeout=timeout)
  48. def ready(self):
  49. """Returns :const:`True` if the task executed successfully, or raised
  50. an exception.
  51. If the task is still running, pending, or is waiting
  52. for retry then :const:`False` is returned.
  53. """
  54. return self.status in self.backend.READY_STATES
  55. def successful(self):
  56. """Returns :const:`True` if the task executed successfully."""
  57. return self.status == states.SUCCESS
  58. def failed(self):
  59. """Returns :const:`True` if the task failed by exception."""
  60. return self.status == states.FAILURE
  61. def __str__(self):
  62. """``str(self) -> self.task_id``"""
  63. return self.task_id
  64. def __hash__(self):
  65. """``hash(self) -> hash(self.task_id)``"""
  66. return hash(self.task_id)
  67. def __repr__(self):
  68. return "<AsyncResult: %s>" % self.task_id
  69. def __eq__(self, other):
  70. if isinstance(other, self.__class__):
  71. return self.task_id == other.task_id
  72. return other == self.task_id
  73. def __copy__(self):
  74. return self.__class__(self.task_id, backend=self.backend)
  75. @property
  76. def result(self):
  77. """When the task has been executed, this contains the return value.
  78. If the task raised an exception, this will be the exception instance.
  79. """
  80. return self.backend.get_result(self.task_id)
  81. @property
  82. def info(self):
  83. """Get state metadata.
  84. Alias to :meth:`result`.
  85. """
  86. return self.result
  87. @property
  88. def traceback(self):
  89. """Get the traceback of a failed task."""
  90. return self.backend.get_traceback(self.task_id)
  91. @property
  92. def status(self):
  93. """Deprecated alias of :attr:`state`."""
  94. return self.state
  95. @property
  96. def state(self):
  97. """The current status of the task.
  98. Can be one of the following:
  99. *PENDING*
  100. The task is waiting for execution.
  101. *STARTED*
  102. The task has been started.
  103. *RETRY*
  104. The task is to be retried, possibly because of failure.
  105. *FAILURE*
  106. The task raised an exception, or has been retried more times
  107. than its limit. The :attr:`result` attribute contains the
  108. exception raised.
  109. *SUCCESS*
  110. The task executed successfully. The :attr:`result` attribute
  111. contains the resulting value.
  112. """
  113. return self.backend.get_status(self.task_id)
  114. class AsyncResult(BaseAsyncResult):
  115. """Pending task result using the default backend.
  116. :param task_id: see :attr:`task_id`.
  117. .. attribute:: task_id
  118. The unique identifier for this task.
  119. .. attribute:: backend
  120. Instance of :class:`celery.backends.DefaultBackend`.
  121. """
  122. def __init__(self, task_id, backend=None):
  123. super(AsyncResult, self).__init__(task_id, backend or default_backend)
  124. class TaskSetResult(object):
  125. """Working with :class:`~celery.task.TaskSet` results.
  126. An instance of this class is returned by
  127. ``TaskSet``'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  128. inspection of the subtasks status and return values as a single entity.
  129. :option taskset_id: see :attr:`taskset_id`.
  130. :option subtasks: see :attr:`subtasks`.
  131. .. attribute:: taskset_id
  132. The UUID of the taskset itself.
  133. .. attribute:: subtasks
  134. A list of :class:`AsyncResult` instances for all of the subtasks.
  135. """
  136. def __init__(self, taskset_id, subtasks):
  137. self.taskset_id = taskset_id
  138. self.subtasks = subtasks
  139. def itersubtasks(self):
  140. """Taskset subtask iterator.
  141. :returns: an iterator for iterating over the tasksets
  142. :class:`AsyncResult` objects.
  143. """
  144. return (subtask for subtask in self.subtasks)
  145. def successful(self):
  146. """Was the taskset successful?
  147. :returns: :const:`True` if all of the tasks in the taskset finished
  148. successfully (i.e. did not raise an exception).
  149. """
  150. return all(subtask.successful()
  151. for subtask in self.itersubtasks())
  152. def failed(self):
  153. """Did the taskset fail?
  154. :returns: :const:`True` if any of the tasks in the taskset failed.
  155. (i.e., raised an exception)
  156. """
  157. return any(subtask.failed()
  158. for subtask in self.itersubtasks())
  159. def waiting(self):
  160. """Is the taskset waiting?
  161. :returns: :const:`True` if any of the tasks in the taskset is still
  162. waiting for execution.
  163. """
  164. return any(not subtask.ready()
  165. for subtask in self.itersubtasks())
  166. def ready(self):
  167. """Is the task ready?
  168. :returns: :const:`True` if all of the tasks in the taskset has been
  169. executed.
  170. """
  171. return all(subtask.ready()
  172. for subtask in self.itersubtasks())
  173. def completed_count(self):
  174. """Task completion count.
  175. :returns: the number of tasks completed.
  176. """
  177. return sum(imap(int, (subtask.successful()
  178. for subtask in self.itersubtasks())))
  179. def forget(self):
  180. """Forget about (and possible remove the result of) all the tasks
  181. in this taskset."""
  182. for subtask in self.subtasks:
  183. subtask.forget()
  184. @with_connection
  185. def revoke(self, connection=None, connect_timeout=None):
  186. for subtask in self.subtasks:
  187. subtask.revoke(connection=connection)
  188. def __iter__(self):
  189. """``iter(res)`` -> ``res.iterate()``."""
  190. return self.iterate()
  191. def __getitem__(self, index):
  192. return self.subtasks[index]
  193. def iterate(self):
  194. """Iterate over the return values of the tasks as they finish
  195. one by one.
  196. :raises: The exception if any of the tasks raised an exception.
  197. """
  198. pending = list(self.subtasks)
  199. results = dict((subtask.task_id, copy(subtask))
  200. for subtask in self.subtasks)
  201. while pending:
  202. for task_id in pending:
  203. result = results[task_id]
  204. if result.status == states.SUCCESS:
  205. try:
  206. pending.remove(task_id)
  207. except ValueError:
  208. pass
  209. yield result.result
  210. elif result.status in states.PROPAGATE_STATES:
  211. raise result.result
  212. def join(self, timeout=None, propagate=True):
  213. """Gather the results of all tasks in the taskset,
  214. and returns a list ordered by the order of the set.
  215. :keyword timeout: The number of seconds to wait for results
  216. before the operation times out.
  217. :keyword propagate: If any of the subtasks raises an exception, the
  218. exception will be reraised.
  219. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  220. :const:`None` and the operation takes longer than ``timeout``
  221. seconds.
  222. :returns: list of return values for all subtasks in order.
  223. """
  224. time_start = time.time()
  225. def on_timeout():
  226. raise TimeoutError("The operation timed out.")
  227. results = PositionQueue(length=self.total)
  228. while True:
  229. for position, pending_result in enumerate(self.subtasks):
  230. state = pending_result.state
  231. if state in states.READY_STATES:
  232. if propagate and state in states.PROPAGATE_STATES:
  233. raise pending_result.result
  234. results[position] = pending_result.result
  235. if results.full():
  236. # Make list copy, so the returned type is not a position
  237. # queue.
  238. return list(results)
  239. else:
  240. if (timeout is not None and
  241. time.time() >= time_start + timeout):
  242. on_timeout()
  243. def save(self, backend=default_backend):
  244. """Save taskset result for later retrieval using :meth:`restore`.
  245. Example:
  246. >>> result.save()
  247. >>> result = TaskSetResult.restore(task_id)
  248. """
  249. backend.save_taskset(self.taskset_id, self)
  250. @classmethod
  251. def restore(self, taskset_id, backend=default_backend):
  252. """Restore previously saved taskset result."""
  253. return backend.restore_taskset(taskset_id)
  254. @property
  255. def total(self):
  256. """The total number of tasks in the :class:`~celery.task.TaskSet`."""
  257. return len(self.subtasks)
  258. class EagerResult(BaseAsyncResult):
  259. """Result that we know has already been executed. """
  260. TimeoutError = TimeoutError
  261. def __init__(self, task_id, ret_value, status, traceback=None):
  262. self.task_id = task_id
  263. self._result = ret_value
  264. self._status = status
  265. self._traceback = traceback
  266. def successful(self):
  267. """Returns :const:`True` if the task executed without failure."""
  268. return self.status == states.SUCCESS
  269. def ready(self):
  270. """Returns :const:`True` if the task has been executed."""
  271. return True
  272. def wait(self, timeout=None):
  273. """Wait until the task has been executed and return its result."""
  274. if self.status == states.SUCCESS:
  275. return self.result
  276. elif self.status in states.PROPAGATE_STATES:
  277. raise self.result
  278. def revoke(self):
  279. self._status = states.REVOKED
  280. @property
  281. def result(self):
  282. """The tasks return value"""
  283. return self._result
  284. @property
  285. def status(self):
  286. """The tasks status (alias to :attr:`state`)."""
  287. return self._status
  288. @property
  289. def state(self):
  290. """The tasks state."""
  291. return self._state
  292. @property
  293. def traceback(self):
  294. """The traceback if the task failed."""
  295. return self._traceback
  296. def __repr__(self):
  297. return "<EagerResult: %s>" % self.task_id