result.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import states
  6. from celery.app import app_or_default
  7. from celery.datastructures import PositionQueue
  8. from celery.exceptions import TimeoutError
  9. from celery.utils import any, all
  10. class BaseAsyncResult(object):
  11. """Base class for pending result, supports custom task result backend.
  12. :param task_id: see :attr:`task_id`.
  13. :param backend: see :attr:`backend`.
  14. .. attribute:: task_id
  15. The unique identifier for this task.
  16. .. attribute:: backend
  17. The task result backend used.
  18. """
  19. TimeoutError = TimeoutError
  20. def __init__(self, task_id, backend, app=None):
  21. self.task_id = task_id
  22. self.backend = backend
  23. self.app = app_or_default(app)
  24. def forget(self):
  25. """Forget about (and possibly remove the result of) this task."""
  26. self.backend.forget(self.task_id)
  27. def revoke(self, connection=None, connect_timeout=None):
  28. """Send revoke signal to all workers.
  29. The workers will ignore the task if received.
  30. """
  31. self.app.control.revoke(self.task_id, connection=connection,
  32. connect_timeout=connect_timeout)
  33. def wait(self, timeout=None):
  34. """Wait for task, and return the result when it arrives.
  35. :keyword timeout: How long to wait, in seconds, before the
  36. operation times out.
  37. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  38. :const:`None` and the result does not arrive within ``timeout``
  39. seconds.
  40. If the remote call raised an exception then that
  41. exception will be re-raised.
  42. """
  43. return self.backend.wait_for(self.task_id, timeout=timeout)
  44. def get(self, timeout=None):
  45. """Alias to :meth:`wait`."""
  46. return self.wait(timeout=timeout)
  47. def ready(self):
  48. """Returns :const:`True` if the task executed successfully, or raised
  49. an exception.
  50. If the task is still running, pending, or is waiting
  51. for retry then :const:`False` is returned.
  52. """
  53. return self.status in self.backend.READY_STATES
  54. def successful(self):
  55. """Returns :const:`True` if the task executed successfully."""
  56. return self.status == states.SUCCESS
  57. def failed(self):
  58. """Returns :const:`True` if the task failed by exception."""
  59. return self.status == states.FAILURE
  60. def __str__(self):
  61. """``str(self) -> self.task_id``"""
  62. return self.task_id
  63. def __hash__(self):
  64. """``hash(self) -> hash(self.task_id)``"""
  65. return hash(self.task_id)
  66. def __repr__(self):
  67. return "<AsyncResult: %s>" % self.task_id
  68. def __eq__(self, other):
  69. if isinstance(other, self.__class__):
  70. return self.task_id == other.task_id
  71. return other == self.task_id
  72. def __copy__(self):
  73. return self.__class__(self.task_id, backend=self.backend)
  74. @property
  75. def result(self):
  76. """When the task has been executed, this contains the return value.
  77. If the task raised an exception, this will be the exception instance.
  78. """
  79. return self.backend.get_result(self.task_id)
  80. @property
  81. def info(self):
  82. """Get state metadata.
  83. Alias to :meth:`result`.
  84. """
  85. return self.result
  86. @property
  87. def traceback(self):
  88. """Get the traceback of a failed task."""
  89. return self.backend.get_traceback(self.task_id)
  90. @property
  91. def status(self):
  92. """Deprecated alias of :attr:`state`."""
  93. return self.state
  94. @property
  95. def state(self):
  96. """The current status of the task.
  97. Can be one of the following:
  98. *PENDING*
  99. The task is waiting for execution.
  100. *STARTED*
  101. The task has been started.
  102. *RETRY*
  103. The task is to be retried, possibly because of failure.
  104. *FAILURE*
  105. The task raised an exception, or has been retried more times
  106. than its limit. The :attr:`result` attribute contains the
  107. exception raised.
  108. *SUCCESS*
  109. The task executed successfully. The :attr:`result` attribute
  110. contains the resulting value.
  111. """
  112. return self.backend.get_status(self.task_id)
  113. class AsyncResult(BaseAsyncResult):
  114. """Pending task result using the default backend.
  115. :param task_id: see :attr:`task_id`.
  116. .. attribute:: task_id
  117. The unique identifier for this task.
  118. .. attribute:: backend
  119. Instance of :class:`celery.backends.DefaultBackend`.
  120. """
  121. def __init__(self, task_id, backend=None, app=None):
  122. app = app_or_default(app)
  123. backend = backend or app.backend
  124. super(AsyncResult, self).__init__(task_id, backend, app=app)
  125. class TaskSetResult(object):
  126. """Working with :class:`~celery.task.TaskSet` results.
  127. An instance of this class is returned by
  128. ``TaskSet``'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  129. inspection of the subtasks status and return values as a single entity.
  130. :option taskset_id: see :attr:`taskset_id`.
  131. :option subtasks: see :attr:`subtasks`.
  132. .. attribute:: taskset_id
  133. The UUID of the taskset itself.
  134. .. attribute:: subtasks
  135. A list of :class:`AsyncResult` instances for all of the subtasks.
  136. """
  137. def __init__(self, taskset_id, subtasks, app=None):
  138. self.taskset_id = taskset_id
  139. self.subtasks = subtasks
  140. self.app = app_or_default(app)
  141. def itersubtasks(self):
  142. """Taskset subtask iterator.
  143. :returns: an iterator for iterating over the tasksets
  144. :class:`AsyncResult` objects.
  145. """
  146. return (subtask for subtask in self.subtasks)
  147. def successful(self):
  148. """Was the taskset successful?
  149. :returns: :const:`True` if all of the tasks in the taskset finished
  150. successfully (i.e. did not raise an exception).
  151. """
  152. return all(subtask.successful()
  153. for subtask in self.itersubtasks())
  154. def failed(self):
  155. """Did the taskset fail?
  156. :returns: :const:`True` if any of the tasks in the taskset failed.
  157. (i.e., raised an exception)
  158. """
  159. return any(subtask.failed()
  160. for subtask in self.itersubtasks())
  161. def waiting(self):
  162. """Is the taskset waiting?
  163. :returns: :const:`True` if any of the tasks in the taskset is still
  164. waiting for execution.
  165. """
  166. return any(not subtask.ready()
  167. for subtask in self.itersubtasks())
  168. def ready(self):
  169. """Is the task ready?
  170. :returns: :const:`True` if all of the tasks in the taskset has been
  171. executed.
  172. """
  173. return all(subtask.ready()
  174. for subtask in self.itersubtasks())
  175. def completed_count(self):
  176. """Task completion count.
  177. :returns: the number of tasks completed.
  178. """
  179. return sum(imap(int, (subtask.successful()
  180. for subtask in self.itersubtasks())))
  181. def forget(self):
  182. """Forget about (and possible remove the result of) all the tasks
  183. in this taskset."""
  184. for subtask in self.subtasks:
  185. subtask.forget()
  186. def revoke(self, connection=None, connect_timeout=None):
  187. def _do_revoke(connection=None, connect_timeout=None):
  188. for subtask in self.subtasks:
  189. subtask.revoke(connection=connection)
  190. return self.app.with_default_connection(_do_revoke)(
  191. connection=connection, connect_timeout=connect_timeout)
  192. def __iter__(self):
  193. """``iter(res)`` -> ``res.iterate()``."""
  194. return self.iterate()
  195. def __getitem__(self, index):
  196. return self.subtasks[index]
  197. def iterate(self):
  198. """Iterate over the return values of the tasks as they finish
  199. one by one.
  200. :raises: The exception if any of the tasks raised an exception.
  201. """
  202. pending = list(self.subtasks)
  203. results = dict((subtask.task_id, copy(subtask))
  204. for subtask in self.subtasks)
  205. while pending:
  206. for task_id in pending:
  207. result = results[task_id]
  208. if result.status == states.SUCCESS:
  209. try:
  210. pending.remove(task_id)
  211. except ValueError:
  212. pass
  213. yield result.result
  214. elif result.status in states.PROPAGATE_STATES:
  215. raise result.result
  216. def join(self, timeout=None, propagate=True):
  217. """Gather the results of all tasks in the taskset,
  218. and returns a list ordered by the order of the set.
  219. :keyword timeout: The number of seconds to wait for results
  220. before the operation times out.
  221. :keyword propagate: If any of the subtasks raises an exception, the
  222. exception will be reraised.
  223. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  224. :const:`None` and the operation takes longer than ``timeout``
  225. seconds.
  226. :returns: list of return values for all subtasks in order.
  227. """
  228. time_start = time.time()
  229. def on_timeout():
  230. raise TimeoutError("The operation timed out.")
  231. results = PositionQueue(length=self.total)
  232. while True:
  233. for position, pending_result in enumerate(self.subtasks):
  234. state = pending_result.state
  235. if state in states.READY_STATES:
  236. if propagate and state in states.PROPAGATE_STATES:
  237. raise pending_result.result
  238. results[position] = pending_result.result
  239. if results.full():
  240. # Make list copy, so the returned type is not a position
  241. # queue.
  242. return list(results)
  243. else:
  244. if (timeout is not None and
  245. time.time() >= time_start + timeout):
  246. on_timeout()
  247. def save(self, backend=None):
  248. """Save taskset result for later retrieval using :meth:`restore`.
  249. Example:
  250. >>> result.save()
  251. >>> result = TaskSetResult.restore(task_id)
  252. """
  253. if backend is None:
  254. backend = self.app.backend
  255. backend.save_taskset(self.taskset_id, self)
  256. @classmethod
  257. def restore(self, taskset_id, backend=None):
  258. """Restore previously saved taskset result."""
  259. if backend is None:
  260. backend = self.app.backend
  261. return backend.restore_taskset(taskset_id)
  262. @property
  263. def total(self):
  264. """The total number of tasks in the :class:`~celery.task.TaskSet`."""
  265. return len(self.subtasks)
  266. class EagerResult(BaseAsyncResult):
  267. """Result that we know has already been executed. """
  268. TimeoutError = TimeoutError
  269. def __init__(self, task_id, ret_value, status, traceback=None):
  270. self.task_id = task_id
  271. self._result = ret_value
  272. self._status = status
  273. self._traceback = traceback
  274. def successful(self):
  275. """Returns :const:`True` if the task executed without failure."""
  276. return self.status == states.SUCCESS
  277. def ready(self):
  278. """Returns :const:`True` if the task has been executed."""
  279. return True
  280. def wait(self, timeout=None):
  281. """Wait until the task has been executed and return its result."""
  282. if self.status == states.SUCCESS:
  283. return self.result
  284. elif self.status in states.PROPAGATE_STATES:
  285. raise self.result
  286. def revoke(self):
  287. self._status = states.REVOKED
  288. @property
  289. def result(self):
  290. """The tasks return value"""
  291. return self._result
  292. @property
  293. def status(self):
  294. """The tasks status (alias to :attr:`state`)."""
  295. return self._status
  296. @property
  297. def state(self):
  298. """The tasks state."""
  299. return self._state
  300. @property
  301. def traceback(self):
  302. """The traceback if the task failed."""
  303. return self._traceback
  304. def __repr__(self):
  305. return "<EagerResult: %s>" % self.task_id