result.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import states
  6. from celery.app import app_or_default
  7. from celery.datastructures import PositionQueue
  8. from celery.exceptions import TimeoutError
  9. from celery.utils import any, all
  10. class BaseAsyncResult(object):
  11. """Base class for pending result, supports custom task result backend.
  12. :param task_id: see :attr:`task_id`.
  13. :param backend: see :attr:`backend`.
  14. .. attribute:: task_id
  15. The unique identifier for this task.
  16. .. attribute:: backend
  17. The task result backend used.
  18. """
  19. TimeoutError = TimeoutError
  20. def __init__(self, task_id, backend, app=None):
  21. self.task_id = task_id
  22. self.backend = backend
  23. self.app = app_or_default(app)
  24. def forget(self):
  25. """Forget about (and possibly remove the result of) this task."""
  26. self.backend.forget(self.task_id)
  27. def revoke(self, connection=None, connect_timeout=None):
  28. """Send revoke signal to all workers.
  29. The workers will ignore the task if received.
  30. """
  31. self.app.control.revoke(self.task_id, connection=connection,
  32. connect_timeout=connect_timeout)
  33. def wait(self, timeout=None):
  34. """Wait for task, and return the result when it arrives.
  35. :keyword timeout: How long to wait, in seconds, before the
  36. operation times out.
  37. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  38. :const:`None` and the result does not arrive within ``timeout``
  39. seconds.
  40. If the remote call raised an exception then that
  41. exception will be re-raised.
  42. """
  43. return self.backend.wait_for(self.task_id, timeout=timeout)
  44. def get(self, timeout=None):
  45. """Alias to :meth:`wait`."""
  46. return self.wait(timeout=timeout)
  47. def ready(self):
  48. """Returns :const:`True` if the task executed successfully, or raised
  49. an exception.
  50. If the task is still running, pending, or is waiting
  51. for retry then :const:`False` is returned.
  52. """
  53. return self.status not in self.backend.UNREADY_STATES
  54. def successful(self):
  55. """Returns :const:`True` if the task executed successfully."""
  56. return self.status == states.SUCCESS
  57. def failed(self):
  58. """Returns :const:`True` if the task failed by exception."""
  59. return self.status == states.FAILURE
  60. def __str__(self):
  61. """``str(self) -> self.task_id``"""
  62. return self.task_id
  63. def __hash__(self):
  64. """``hash(self) -> hash(self.task_id)``"""
  65. return hash(self.task_id)
  66. def __repr__(self):
  67. return "<AsyncResult: %s>" % self.task_id
  68. def __eq__(self, other):
  69. if isinstance(other, self.__class__):
  70. return self.task_id == other.task_id
  71. return other == self.task_id
  72. def __copy__(self):
  73. return self.__class__(self.task_id, backend=self.backend)
  74. @property
  75. def result(self):
  76. """When the task has been executed, this contains the return value.
  77. If the task raised an exception, this will be the exception instance.
  78. """
  79. return self.backend.get_result(self.task_id)
  80. @property
  81. def traceback(self):
  82. """Get the traceback of a failed task."""
  83. return self.backend.get_traceback(self.task_id)
  84. @property
  85. def status(self):
  86. """Deprecated alias of :attr:`state`."""
  87. return self.state
  88. @property
  89. def state(self):
  90. """The current status of the task.
  91. Can be one of the following:
  92. *PENDING*
  93. The task is waiting for execution.
  94. *STARTED*
  95. The task has been started.
  96. *RETRY*
  97. The task is to be retried, possibly because of failure.
  98. *FAILURE*
  99. The task raised an exception, or has been retried more times
  100. than its limit. The :attr:`result` attribute contains the
  101. exception raised.
  102. *SUCCESS*
  103. The task executed successfully. The :attr:`result` attribute
  104. contains the resulting value.
  105. """
  106. return self.backend.get_status(self.task_id)
  107. class AsyncResult(BaseAsyncResult):
  108. """Pending task result using the default backend.
  109. :param task_id: see :attr:`task_id`.
  110. .. attribute:: task_id
  111. The unique identifier for this task.
  112. .. attribute:: backend
  113. Instance of :class:`celery.backends.DefaultBackend`.
  114. """
  115. def __init__(self, task_id, backend=None, app=None):
  116. app = app_or_default(app)
  117. backend = backend or app.backend
  118. super(AsyncResult, self).__init__(task_id, backend, app=app)
  119. class TaskSetResult(object):
  120. """Working with :class:`~celery.task.TaskSet` results.
  121. An instance of this class is returned by
  122. ``TaskSet``'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  123. inspection of the subtasks status and return values as a single entity.
  124. :option taskset_id: see :attr:`taskset_id`.
  125. :option subtasks: see :attr:`subtasks`.
  126. .. attribute:: taskset_id
  127. The UUID of the taskset itself.
  128. .. attribute:: subtasks
  129. A list of :class:`AsyncResult` instances for all of the subtasks.
  130. """
  131. def __init__(self, taskset_id, subtasks, app=None):
  132. self.taskset_id = taskset_id
  133. self.subtasks = subtasks
  134. self.app = app_or_default(app)
  135. def itersubtasks(self):
  136. """Taskset subtask iterator.
  137. :returns: an iterator for iterating over the tasksets
  138. :class:`AsyncResult` objects.
  139. """
  140. return (subtask for subtask in self.subtasks)
  141. def successful(self):
  142. """Was the taskset successful?
  143. :returns: :const:`True` if all of the tasks in the taskset finished
  144. successfully (i.e. did not raise an exception).
  145. """
  146. return all(subtask.successful()
  147. for subtask in self.itersubtasks())
  148. def failed(self):
  149. """Did the taskset fail?
  150. :returns: :const:`True` if any of the tasks in the taskset failed.
  151. (i.e., raised an exception)
  152. """
  153. return any(subtask.failed()
  154. for subtask in self.itersubtasks())
  155. def waiting(self):
  156. """Is the taskset waiting?
  157. :returns: :const:`True` if any of the tasks in the taskset is still
  158. waiting for execution.
  159. """
  160. return any(not subtask.ready()
  161. for subtask in self.itersubtasks())
  162. def ready(self):
  163. """Is the task ready?
  164. :returns: :const:`True` if all of the tasks in the taskset has been
  165. executed.
  166. """
  167. return all(subtask.ready()
  168. for subtask in self.itersubtasks())
  169. def completed_count(self):
  170. """Task completion count.
  171. :returns: the number of tasks completed.
  172. """
  173. return sum(imap(int, (subtask.successful()
  174. for subtask in self.itersubtasks())))
  175. def forget(self):
  176. """Forget about (and possible remove the result of) all the tasks
  177. in this taskset."""
  178. for subtask in self.subtasks:
  179. subtask.forget()
  180. def revoke(self, connection=None, connect_timeout=None):
  181. def _do_revoke(connection=None, connect_timeout=None):
  182. for subtask in self.subtasks:
  183. subtask.revoke(connection=connection)
  184. return self.app.with_default_connection(_do_revoke)(
  185. connection=connection, connect_timeout=connect_timeout)
  186. def __iter__(self):
  187. """``iter(res)`` -> ``res.iterate()``."""
  188. return self.iterate()
  189. def __getitem__(self, index):
  190. return self.subtasks[index]
  191. def iterate(self):
  192. """Iterate over the return values of the tasks as they finish
  193. one by one.
  194. :raises: The exception if any of the tasks raised an exception.
  195. """
  196. pending = list(self.subtasks)
  197. results = dict((subtask.task_id, copy(subtask))
  198. for subtask in self.subtasks)
  199. while pending:
  200. for task_id in pending:
  201. result = results[task_id]
  202. if result.status == states.SUCCESS:
  203. try:
  204. pending.remove(task_id)
  205. except ValueError:
  206. pass
  207. yield result.result
  208. elif result.status in states.PROPAGATE_STATES:
  209. raise result.result
  210. def join(self, timeout=None, propagate=True):
  211. """Gather the results of all tasks in the taskset,
  212. and returns a list ordered by the order of the set.
  213. :keyword timeout: The number of seconds to wait for results
  214. before the operation times out.
  215. :keyword propagate: If any of the subtasks raises an exception, the
  216. exception will be reraised.
  217. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  218. :const:`None` and the operation takes longer than ``timeout``
  219. seconds.
  220. :returns: list of return values for all subtasks in order.
  221. """
  222. time_start = time.time()
  223. def on_timeout():
  224. raise TimeoutError("The operation timed out.")
  225. results = PositionQueue(length=self.total)
  226. while True:
  227. for position, pending_result in enumerate(self.subtasks):
  228. state = pending_result.state
  229. if state in states.READY_STATES:
  230. if propagate and state in states.PROPAGATE_STATES:
  231. raise pending_result.result
  232. results[position] = pending_result.result
  233. if results.full():
  234. # Make list copy, so the returned type is not a position
  235. # queue.
  236. return list(results)
  237. else:
  238. if (timeout is not None and
  239. time.time() >= time_start + timeout):
  240. on_timeout()
  241. def save(self, backend=None):
  242. """Save taskset result for later retrieval using :meth:`restore`.
  243. Example:
  244. >>> result.save()
  245. >>> result = TaskSetResult.restore(task_id)
  246. """
  247. if backend is None:
  248. backend = self.app.backend
  249. backend.save_taskset(self.taskset_id, self)
  250. @classmethod
  251. def restore(self, taskset_id, backend=None):
  252. """Restore previously saved taskset result."""
  253. if backend is None:
  254. backend = self.app.backend
  255. return backend.restore_taskset(taskset_id)
  256. @property
  257. def total(self):
  258. """The total number of tasks in the :class:`~celery.task.TaskSet`."""
  259. return len(self.subtasks)
  260. class EagerResult(BaseAsyncResult):
  261. """Result that we know has already been executed. """
  262. TimeoutError = TimeoutError
  263. def __init__(self, task_id, ret_value, status, traceback=None):
  264. self.task_id = task_id
  265. self._result = ret_value
  266. self._status = status
  267. self._traceback = traceback
  268. def successful(self):
  269. """Returns :const:`True` if the task executed without failure."""
  270. return self.status == states.SUCCESS
  271. def ready(self):
  272. """Returns :const:`True` if the task has been executed."""
  273. return True
  274. def wait(self, timeout=None):
  275. """Wait until the task has been executed and return its result."""
  276. if self.status == states.SUCCESS:
  277. return self.result
  278. elif self.status in states.PROPAGATE_STATES:
  279. raise self.result
  280. def revoke(self):
  281. self._status = states.REVOKED
  282. @property
  283. def result(self):
  284. """The tasks return value"""
  285. return self._result
  286. @property
  287. def status(self):
  288. """The tasks status (alias to :attr:`state`)."""
  289. return self._status
  290. @property
  291. def state(self):
  292. """The tasks state."""
  293. return self._state
  294. @property
  295. def traceback(self):
  296. """The traceback if the task failed."""
  297. return self._traceback
  298. def __repr__(self):
  299. return "<EagerResult: %s>" % self.task_id