result.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import states
  6. from celery.backends import default_backend
  7. from celery.datastructures import PositionQueue
  8. from celery.exceptions import TimeoutError
  9. from celery.messaging import with_connection
  10. from celery.registry import _unpickle_task
  11. from celery.utils import any, all
  12. def _unpickle_result(task_id, task_name):
  13. return _unpickle_task(task_name).AsyncResult(task_id)
  14. class BaseAsyncResult(object):
  15. """Base class for pending result, supports custom task result backend.
  16. :param task_id: see :attr:`task_id`.
  17. :param backend: see :attr:`backend`.
  18. .. attribute:: task_id
  19. The unique identifier for this task.
  20. .. attribute:: backend
  21. The task result backend used.
  22. """
  23. TimeoutError = TimeoutError
  24. def __init__(self, task_id, backend, task_name=None):
  25. self.task_id = task_id
  26. self.backend = backend
  27. self.task_name = task_name
  28. def __reduce__(self):
  29. if self.task_name:
  30. return (_unpickle_result, (self.task_id, self.task_name))
  31. else:
  32. return (self.__class__, (self.task_id, self.backend))
  33. def forget(self):
  34. """Forget about (and possibly remove the result of) this task."""
  35. self.backend.forget(self.task_id)
  36. def revoke(self, connection=None, connect_timeout=None):
  37. """Send revoke signal to all workers.
  38. The workers will ignore the task if received.
  39. """
  40. from celery.task import control
  41. control.revoke(self.task_id, connection=connection,
  42. connect_timeout=connect_timeout)
  43. def wait(self, timeout=None):
  44. """Wait for task, and return the result when it arrives.
  45. :keyword timeout: How long to wait, in seconds, before the
  46. operation times out.
  47. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  48. :const:`None` and the result does not arrive within ``timeout``
  49. seconds.
  50. If the remote call raised an exception then that
  51. exception will be re-raised.
  52. """
  53. return self.backend.wait_for(self.task_id, timeout=timeout)
  54. def get(self, timeout=None):
  55. """Alias to :meth:`wait`."""
  56. return self.wait(timeout=timeout)
  57. def ready(self):
  58. """Returns :const:`True` if the task executed successfully, or raised
  59. an exception.
  60. If the task is still running, pending, or is waiting
  61. for retry then :const:`False` is returned.
  62. """
  63. return self.status in self.backend.READY_STATES
  64. def successful(self):
  65. """Returns :const:`True` if the task executed successfully."""
  66. return self.status == states.SUCCESS
  67. def failed(self):
  68. """Returns :const:`True` if the task failed by exception."""
  69. return self.status == states.FAILURE
  70. def __str__(self):
  71. """``str(self) -> self.task_id``"""
  72. return self.task_id
  73. def __hash__(self):
  74. """``hash(self) -> hash(self.task_id)``"""
  75. return hash(self.task_id)
  76. def __repr__(self):
  77. return "<AsyncResult: %s>" % self.task_id
  78. def __eq__(self, other):
  79. if isinstance(other, self.__class__):
  80. return self.task_id == other.task_id
  81. return other == self.task_id
  82. def __copy__(self):
  83. return self.__class__(self.task_id, backend=self.backend)
  84. @property
  85. def result(self):
  86. """When the task has been executed, this contains the return value.
  87. If the task raised an exception, this will be the exception instance.
  88. """
  89. return self.backend.get_result(self.task_id)
  90. @property
  91. def info(self):
  92. """Get state metadata.
  93. Alias to :meth:`result`.
  94. """
  95. return self.result
  96. @property
  97. def traceback(self):
  98. """Get the traceback of a failed task."""
  99. return self.backend.get_traceback(self.task_id)
  100. @property
  101. def status(self):
  102. """Deprecated alias of :attr:`state`."""
  103. return self.state
  104. @property
  105. def state(self):
  106. """The current status of the task.
  107. Can be one of the following:
  108. *PENDING*
  109. The task is waiting for execution.
  110. *STARTED*
  111. The task has been started.
  112. *RETRY*
  113. The task is to be retried, possibly because of failure.
  114. *FAILURE*
  115. The task raised an exception, or has been retried more times
  116. than its limit. The :attr:`result` attribute contains the
  117. exception raised.
  118. *SUCCESS*
  119. The task executed successfully. The :attr:`result` attribute
  120. contains the resulting value.
  121. """
  122. return self.backend.get_status(self.task_id)
  123. class AsyncResult(BaseAsyncResult):
  124. """Pending task result using the default backend.
  125. :param task_id: see :attr:`task_id`.
  126. .. attribute:: task_id
  127. The unique identifier for this task.
  128. .. attribute:: backend
  129. Instance of :class:`celery.backends.DefaultBackend`.
  130. """
  131. def __init__(self, task_id, backend=None, task_name=None):
  132. super(AsyncResult, self).__init__(task_id, backend or default_backend,
  133. task_name=task_name)
  134. class TaskSetResult(object):
  135. """Working with :class:`~celery.task.TaskSet` results.
  136. An instance of this class is returned by
  137. ``TaskSet``'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  138. inspection of the subtasks status and return values as a single entity.
  139. :option taskset_id: see :attr:`taskset_id`.
  140. :option subtasks: see :attr:`subtasks`.
  141. .. attribute:: taskset_id
  142. The UUID of the taskset itself.
  143. .. attribute:: subtasks
  144. A list of :class:`AsyncResult` instances for all of the subtasks.
  145. """
  146. def __init__(self, taskset_id, subtasks):
  147. self.taskset_id = taskset_id
  148. self.subtasks = subtasks
  149. def itersubtasks(self):
  150. """Taskset subtask iterator.
  151. :returns: an iterator for iterating over the tasksets
  152. :class:`AsyncResult` objects.
  153. """
  154. return (subtask for subtask in self.subtasks)
  155. def successful(self):
  156. """Was the taskset successful?
  157. :returns: :const:`True` if all of the tasks in the taskset finished
  158. successfully (i.e. did not raise an exception).
  159. """
  160. return all(subtask.successful()
  161. for subtask in self.itersubtasks())
  162. def failed(self):
  163. """Did the taskset fail?
  164. :returns: :const:`True` if any of the tasks in the taskset failed.
  165. (i.e., raised an exception)
  166. """
  167. return any(subtask.failed()
  168. for subtask in self.itersubtasks())
  169. def waiting(self):
  170. """Is the taskset waiting?
  171. :returns: :const:`True` if any of the tasks in the taskset is still
  172. waiting for execution.
  173. """
  174. return any(not subtask.ready()
  175. for subtask in self.itersubtasks())
  176. def ready(self):
  177. """Is the task ready?
  178. :returns: :const:`True` if all of the tasks in the taskset has been
  179. executed.
  180. """
  181. return all(subtask.ready()
  182. for subtask in self.itersubtasks())
  183. def completed_count(self):
  184. """Task completion count.
  185. :returns: the number of tasks completed.
  186. """
  187. return sum(imap(int, (subtask.successful()
  188. for subtask in self.itersubtasks())))
  189. def forget(self):
  190. """Forget about (and possible remove the result of) all the tasks
  191. in this taskset."""
  192. for subtask in self.subtasks:
  193. subtask.forget()
  194. @with_connection
  195. def revoke(self, connection=None, connect_timeout=None):
  196. for subtask in self.subtasks:
  197. subtask.revoke(connection=connection)
  198. def __iter__(self):
  199. """``iter(res)`` -> ``res.iterate()``."""
  200. return self.iterate()
  201. def __getitem__(self, index):
  202. return self.subtasks[index]
  203. def iterate(self):
  204. """Iterate over the return values of the tasks as they finish
  205. one by one.
  206. :raises: The exception if any of the tasks raised an exception.
  207. """
  208. pending = list(self.subtasks)
  209. results = dict((subtask.task_id, copy(subtask))
  210. for subtask in self.subtasks)
  211. while pending:
  212. for task_id in pending:
  213. result = results[task_id]
  214. if result.status == states.SUCCESS:
  215. try:
  216. pending.remove(task_id)
  217. except ValueError:
  218. pass
  219. yield result.result
  220. elif result.status in states.PROPAGATE_STATES:
  221. raise result.result
  222. def join(self, timeout=None, propagate=True):
  223. """Gather the results of all tasks in the taskset,
  224. and returns a list ordered by the order of the set.
  225. :keyword timeout: The number of seconds to wait for results
  226. before the operation times out.
  227. :keyword propagate: If any of the subtasks raises an exception, the
  228. exception will be reraised.
  229. :raises celery.exceptions.TimeoutError: if ``timeout`` is not
  230. :const:`None` and the operation takes longer than ``timeout``
  231. seconds.
  232. :returns: list of return values for all subtasks in order.
  233. """
  234. time_start = time.time()
  235. def on_timeout():
  236. raise TimeoutError("The operation timed out.")
  237. results = PositionQueue(length=self.total)
  238. while True:
  239. for position, pending_result in enumerate(self.subtasks):
  240. state = pending_result.state
  241. if state in states.READY_STATES:
  242. if propagate and state in states.PROPAGATE_STATES:
  243. raise pending_result.result
  244. results[position] = pending_result.result
  245. if results.full():
  246. # Make list copy, so the returned type is not a position
  247. # queue.
  248. return list(results)
  249. else:
  250. if (timeout is not None and
  251. time.time() >= time_start + timeout):
  252. on_timeout()
  253. def save(self, backend=default_backend):
  254. """Save taskset result for later retrieval using :meth:`restore`.
  255. Example:
  256. >>> result.save()
  257. >>> result = TaskSetResult.restore(task_id)
  258. """
  259. backend.save_taskset(self.taskset_id, self)
  260. @classmethod
  261. def restore(self, taskset_id, backend=default_backend):
  262. """Restore previously saved taskset result."""
  263. return backend.restore_taskset(taskset_id)
  264. @property
  265. def total(self):
  266. """The total number of tasks in the :class:`~celery.task.TaskSet`."""
  267. return len(self.subtasks)
  268. class EagerResult(BaseAsyncResult):
  269. """Result that we know has already been executed. """
  270. TimeoutError = TimeoutError
  271. def __init__(self, task_id, ret_value, status, traceback=None):
  272. self.task_id = task_id
  273. self._result = ret_value
  274. self._status = status
  275. self._traceback = traceback
  276. def successful(self):
  277. """Returns :const:`True` if the task executed without failure."""
  278. return self.status == states.SUCCESS
  279. def ready(self):
  280. """Returns :const:`True` if the task has been executed."""
  281. return True
  282. def wait(self, timeout=None):
  283. """Wait until the task has been executed and return its result."""
  284. if self.status == states.SUCCESS:
  285. return self.result
  286. elif self.status in states.PROPAGATE_STATES:
  287. raise self.result
  288. def revoke(self):
  289. self._status = states.REVOKED
  290. @property
  291. def result(self):
  292. """The tasks return value"""
  293. return self._result
  294. @property
  295. def status(self):
  296. """The tasks status (alias to :attr:`state`)."""
  297. return self._status
  298. @property
  299. def state(self):
  300. """The tasks state."""
  301. return self._state
  302. @property
  303. def traceback(self):
  304. """The traceback if the task failed."""
  305. return self._traceback
  306. def __repr__(self):
  307. return "<EagerResult: %s>" % self.task_id