result.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import states
  6. from celery.backends import default_backend
  7. from celery.datastructures import PositionQueue
  8. from celery.exceptions import TimeoutError
  9. from celery.messaging import with_connection
  10. from celery.utils import any, all
  11. class BaseAsyncResult(object):
  12. """Base class for pending result, supports custom task result backend.
  13. :param task_id: see :attr:`task_id`.
  14. :param backend: see :attr:`backend`.
  15. .. attribute:: task_id
  16. The unique identifier for this task.
  17. .. attribute:: backend
  18. The task result backend used.
  19. """
  20. TimeoutError = TimeoutError
  21. def __init__(self, task_id, backend):
  22. self.task_id = task_id
  23. self.backend = backend
  24. def forget(self):
  25. """Forget about (and possibly remove the result of) this task."""
  26. self.backend.forget(self.task_id)
  27. def revoke(self, connection=None, connect_timeout=None):
  28. """Send revoke signal to all workers.
  29. The workers will ignore the task if received.
  30. """
  31. from celery.task import control
  32. control.revoke(self.task_id, connection=connection,
  33. connect_timeout=connect_timeout)
  34. def wait(self, timeout=None):
  35. """Wait for task, and return the result when it arrives.
  36. :keyword timeout: How long to wait, in seconds, before the
  37. operation times out.
  38. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  39. and the result does not arrive within ``timeout`` seconds.
  40. If the remote call raised an exception then that
  41. exception will be re-raised.
  42. """
  43. return self.backend.wait_for(self.task_id, timeout=timeout)
  44. def get(self, timeout=None):
  45. """Alias to :meth:`wait`."""
  46. return self.wait(timeout=timeout)
  47. def ready(self):
  48. """Returns ``True`` if the task executed successfully, or raised
  49. an exception.
  50. If the task is still running, pending, or is waiting
  51. for retry then ``False`` is returned.
  52. """
  53. return self.status not in self.backend.UNREADY_STATES
  54. def successful(self):
  55. """Returns ``True`` if the task executed successfully."""
  56. return self.status == states.SUCCESS
  57. def failed(self):
  58. """Returns ``True`` if the task failed by exception."""
  59. return self.status == states.FAILURE
  60. def __str__(self):
  61. """``str(self) -> self.task_id``"""
  62. return self.task_id
  63. def __hash__(self):
  64. """``hash(self) -> hash(self.task_id)``"""
  65. return hash(self.task_id)
  66. def __repr__(self):
  67. return "<AsyncResult: %s>" % self.task_id
  68. def __eq__(self, other):
  69. if isinstance(other, self.__class__):
  70. return self.task_id == other.task_id
  71. return other == self.task_id
  72. def __copy__(self):
  73. return self.__class__(self.task_id, backend=self.backend)
  74. @property
  75. def result(self):
  76. """When the task has been executed, this contains the return value.
  77. If the task raised an exception, this will be the exception instance.
  78. """
  79. return self.backend.get_result(self.task_id)
  80. @property
  81. def traceback(self):
  82. """Get the traceback of a failed task."""
  83. return self.backend.get_traceback(self.task_id)
  84. @property
  85. def status(self):
  86. """The current status of the task.
  87. Can be one of the following:
  88. *PENDING*
  89. The task is waiting for execution.
  90. *STARTED*
  91. The task has been started.
  92. *RETRY*
  93. The task is to be retried, possibly because of failure.
  94. *FAILURE*
  95. The task raised an exception, or has been retried more times
  96. than its limit. The :attr:`result` attribute contains the
  97. exception raised.
  98. *SUCCESS*
  99. The task executed successfully. The :attr:`result` attribute
  100. contains the resulting value.
  101. """
  102. return self.backend.get_status(self.task_id)
  103. class AsyncResult(BaseAsyncResult):
  104. """Pending task result using the default backend.
  105. :param task_id: see :attr:`task_id`.
  106. .. attribute:: task_id
  107. The unique identifier for this task.
  108. .. attribute:: backend
  109. Instance of :class:`celery.backends.DefaultBackend`.
  110. """
  111. def __init__(self, task_id, backend=None):
  112. super(AsyncResult, self).__init__(task_id, backend or default_backend)
  113. class TaskSetResult(object):
  114. """Working with :class:`~celery.task.TaskSet` results.
  115. An instance of this class is returned by
  116. ``TaskSet``'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  117. inspection of the subtasks status and return values as a single entity.
  118. :option taskset_id: see :attr:`taskset_id`.
  119. :option subtasks: see :attr:`subtasks`.
  120. .. attribute:: taskset_id
  121. The UUID of the taskset itself.
  122. .. attribute:: subtasks
  123. A list of :class:`AsyncResult` instances for all of the subtasks.
  124. """
  125. def __init__(self, taskset_id, subtasks):
  126. self.taskset_id = taskset_id
  127. self.subtasks = subtasks
  128. def itersubtasks(self):
  129. """Taskset subtask iterator.
  130. :returns: an iterator for iterating over the tasksets
  131. :class:`AsyncResult` objects.
  132. """
  133. return (subtask for subtask in self.subtasks)
  134. def successful(self):
  135. """Was the taskset successful?
  136. :returns: ``True`` if all of the tasks in the taskset finished
  137. successfully (i.e. did not raise an exception).
  138. """
  139. return all(subtask.successful()
  140. for subtask in self.itersubtasks())
  141. def failed(self):
  142. """Did the taskset fail?
  143. :returns: ``True`` if any of the tasks in the taskset failed.
  144. (i.e., raised an exception)
  145. """
  146. return any(subtask.failed()
  147. for subtask in self.itersubtasks())
  148. def waiting(self):
  149. """Is the taskset waiting?
  150. :returns: ``True`` if any of the tasks in the taskset is still
  151. waiting for execution.
  152. """
  153. return any(not subtask.ready()
  154. for subtask in self.itersubtasks())
  155. def ready(self):
  156. """Is the task ready?
  157. :returns: ``True`` if all of the tasks in the taskset has been
  158. executed.
  159. """
  160. return all(subtask.ready()
  161. for subtask in self.itersubtasks())
  162. def completed_count(self):
  163. """Task completion count.
  164. :returns: the number of tasks completed.
  165. """
  166. return sum(imap(int, (subtask.successful()
  167. for subtask in self.itersubtasks())))
  168. def forget(self):
  169. """Forget about (and possible remove the result of) all the tasks
  170. in this taskset."""
  171. for subtask in self.subtasks:
  172. subtask.forget()
  173. @with_connection
  174. def revoke(self, connection=None, connect_timeout=None):
  175. for subtask in self.subtasks:
  176. subtask.revoke(connection=connection)
  177. def __iter__(self):
  178. """``iter(res)`` -> ``res.iterate()``."""
  179. return self.iterate()
  180. def __getitem__(self, index):
  181. return self.subtasks[index]
  182. def iterate(self):
  183. """Iterate over the return values of the tasks as they finish
  184. one by one.
  185. :raises: The exception if any of the tasks raised an exception.
  186. """
  187. pending = list(self.subtasks)
  188. results = dict((subtask.task_id, copy(subtask))
  189. for subtask in self.subtasks)
  190. while pending:
  191. for task_id in pending:
  192. result = results[task_id]
  193. if result.status == states.SUCCESS:
  194. try:
  195. pending.remove(task_id)
  196. except ValueError:
  197. pass
  198. yield result.result
  199. elif result.status in states.PROPAGATE_STATES:
  200. raise result.result
  201. def join(self, timeout=None):
  202. """Gather the results for all of the tasks in the taskset,
  203. and return a list with them ordered by the order of which they
  204. were called.
  205. :keyword timeout: The time in seconds, how long
  206. it will wait for results, before the operation times out.
  207. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  208. and the operation takes longer than ``timeout`` seconds.
  209. If any of the tasks raises an exception, the exception
  210. will be reraised by :meth:`join`.
  211. :returns: list of return values for all tasks in the taskset.
  212. """
  213. time_start = time.time()
  214. def on_timeout():
  215. raise TimeoutError("The operation timed out.")
  216. results = PositionQueue(length=self.total)
  217. while True:
  218. for position, pending_result in enumerate(self.subtasks):
  219. if pending_result.status == states.SUCCESS:
  220. results[position] = pending_result.result
  221. elif pending_result.status in states.PROPAGATE_STATES:
  222. raise pending_result.result
  223. if results.full():
  224. # Make list copy, so the returned type is not a position
  225. # queue.
  226. return list(results)
  227. else:
  228. if timeout is not None and \
  229. time.time() >= time_start + timeout:
  230. on_timeout()
  231. def save(self, backend=default_backend):
  232. """Save taskset result for later retrieval using :meth:`restore`.
  233. Example:
  234. >>> result.save()
  235. >>> result = TaskSetResult.restore(task_id)
  236. """
  237. backend.save_taskset(self.taskset_id, self)
  238. @classmethod
  239. def restore(self, taskset_id, backend=default_backend):
  240. """Restore previously saved taskset result."""
  241. return backend.restore_taskset(taskset_id)
  242. @property
  243. def total(self):
  244. """The total number of tasks in the :class:`~celery.task.TaskSet`."""
  245. return len(self.subtasks)
  246. class EagerResult(BaseAsyncResult):
  247. """Result that we know has already been executed. """
  248. TimeoutError = TimeoutError
  249. def __init__(self, task_id, ret_value, status, traceback=None):
  250. self.task_id = task_id
  251. self._result = ret_value
  252. self._status = status
  253. self._traceback = traceback
  254. def successful(self):
  255. """Returns ``True`` if the task executed without failure."""
  256. return self.status == states.SUCCESS
  257. def ready(self):
  258. """Returns ``True`` if the task has been executed."""
  259. return True
  260. def wait(self, timeout=None):
  261. """Wait until the task has been executed and return its result."""
  262. if self.status == states.SUCCESS:
  263. return self.result
  264. elif self.status in states.PROPAGATE_STATES:
  265. raise self.result
  266. def revoke(self):
  267. self._status = states.REVOKED
  268. @property
  269. def result(self):
  270. """The tasks return value"""
  271. return self._result
  272. @property
  273. def status(self):
  274. """The tasks status"""
  275. return self._status
  276. @property
  277. def traceback(self):
  278. """The traceback if the task failed."""
  279. return self._traceback
  280. def __repr__(self):
  281. return "<EagerResult: %s>" % self.task_id