result.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import states
  6. from celery.app import app_or_default
  7. from celery.datastructures import PositionQueue
  8. from celery.exceptions import TimeoutError
  9. from celery.utils.compat import any, all
  10. class BaseAsyncResult(object):
  11. """Base class for pending result, supports custom task result backend.
  12. :param task_id: see :attr:`task_id`.
  13. :param backend: see :attr:`backend`.
  14. """
  15. #: Error raised for timeouts.
  16. TimeoutError = TimeoutError
  17. #: The task uuid.
  18. task_id = None
  19. #: The task result backend to use.
  20. backend = None
  21. def __init__(self, task_id, backend, app=None):
  22. self.task_id = task_id
  23. self.backend = backend
  24. self.app = app_or_default(app)
  25. def forget(self):
  26. """Forget about (and possibly remove the result of) this task."""
  27. self.backend.forget(self.task_id)
  28. def revoke(self, connection=None, connect_timeout=None):
  29. """Send revoke signal to all workers.
  30. Any worker receiving the task, or having reserved the
  31. task, *must* ignore it.
  32. """
  33. self.app.control.revoke(self.task_id, connection=connection,
  34. connect_timeout=connect_timeout)
  35. def wait(self, timeout=None):
  36. """Wait for task, and return the result.
  37. :keyword timeout: How long to wait, in seconds, before the
  38. operation times out.
  39. :raises celery.exceptions.TimeoutError: if `timeout` is not
  40. :const:`None` and the result does not arrive within `timeout`
  41. seconds.
  42. If the remote call raised an exception then that exception will
  43. be re-raised.
  44. """
  45. return self.backend.wait_for(self.task_id, timeout=timeout)
  46. def get(self, timeout=None):
  47. """Alias to :meth:`wait`."""
  48. return self.wait(timeout=timeout)
  49. def ready(self):
  50. """Returns :const:`True` if the task has been executed.
  51. If the task is still running, pending, or is waiting
  52. for retry then :const:`False` is returned.
  53. """
  54. return self.status in self.backend.READY_STATES
  55. def successful(self):
  56. """Returns :const:`True` if the task executed successfully."""
  57. return self.status == states.SUCCESS
  58. def failed(self):
  59. """Returns :const:`True` if the task failed by exception."""
  60. return self.status == states.FAILURE
  61. def __str__(self):
  62. """`str(self) -> self.task_id`"""
  63. return self.task_id
  64. def __hash__(self):
  65. """`hash(self) -> hash(self.task_id)`"""
  66. return hash(self.task_id)
  67. def __repr__(self):
  68. return "<AsyncResult: %s>" % self.task_id
  69. def __eq__(self, other):
  70. if isinstance(other, self.__class__):
  71. return self.task_id == other.task_id
  72. return other == self.task_id
  73. def __copy__(self):
  74. return self.__class__(self.task_id, backend=self.backend)
  75. @property
  76. def result(self):
  77. """When the task has been executed, this contains the return value.
  78. If the task raised an exception, this will be the exception
  79. instance."""
  80. return self.backend.get_result(self.task_id)
  81. @property
  82. def info(self):
  83. """Get state metadata. Alias to :meth:`result`."""
  84. return self.result
  85. @property
  86. def traceback(self):
  87. """Get the traceback of a failed task."""
  88. return self.backend.get_traceback(self.task_id)
  89. @property
  90. def status(self):
  91. """Deprecated alias of :attr:`state`."""
  92. return self.state
  93. @property
  94. def state(self):
  95. """The tasks current state.
  96. Possible values includes:
  97. *PENDING*
  98. The task is waiting for execution.
  99. *STARTED*
  100. The task has been started.
  101. *RETRY*
  102. The task is to be retried, possibly because of failure.
  103. *FAILURE*
  104. The task raised an exception, or has been retried more times
  105. than its limit. The :attr:`result` attribute contains the
  106. exception raised.
  107. *SUCCESS*
  108. The task executed successfully. The :attr:`result` attribute
  109. contains the resulting value.
  110. """
  111. return self.backend.get_status(self.task_id)
  112. class AsyncResult(BaseAsyncResult):
  113. """Pending task result using the default backend.
  114. :param task_id: The tasks uuid.
  115. """
  116. #: The tasks uuid.
  117. uuid = None
  118. #: Task result store backend to use.
  119. backend = None
  120. def __init__(self, task_id, backend=None, app=None):
  121. app = app_or_default(app)
  122. backend = backend or app.backend
  123. super(AsyncResult, self).__init__(task_id, backend, app=app)
  124. class TaskSetResult(object):
  125. """Working with :class:`~celery.task.sets.TaskSet` results.
  126. An instance of this class is returned by
  127. `TaskSet`'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  128. inspection of the subtasks state and return values as a single entity.
  129. :param taskset_id: The id of the taskset.
  130. :param subtasks: List of result instances.
  131. """
  132. #: The UUID of the taskset.
  133. taskset_id = None
  134. #: A list of :class:`AsyncResult` instances for all of the subtasks.
  135. subtasks = None
  136. def __init__(self, taskset_id, subtasks, app=None):
  137. self.taskset_id = taskset_id
  138. self.subtasks = subtasks
  139. self.app = app_or_default(app)
  140. def itersubtasks(self):
  141. """Taskset subtask iterator.
  142. :returns: an iterator for iterating over the tasksets
  143. :class:`AsyncResult` objects.
  144. """
  145. return (subtask for subtask in self.subtasks)
  146. def successful(self):
  147. """Was the taskset successful?
  148. :returns: :const:`True` if all of the tasks in the taskset finished
  149. successfully (i.e. did not raise an exception).
  150. """
  151. return all(subtask.successful()
  152. for subtask in self.itersubtasks())
  153. def failed(self):
  154. """Did the taskset fail?
  155. :returns: :const:`True` if any of the tasks in the taskset failed.
  156. (i.e., raised an exception)
  157. """
  158. return any(subtask.failed()
  159. for subtask in self.itersubtasks())
  160. def waiting(self):
  161. """Is the taskset waiting?
  162. :returns: :const:`True` if any of the tasks in the taskset is still
  163. waiting for execution.
  164. """
  165. return any(not subtask.ready()
  166. for subtask in self.itersubtasks())
  167. def ready(self):
  168. """Is the task ready?
  169. :returns: :const:`True` if all of the tasks in the taskset has been
  170. executed.
  171. """
  172. return all(subtask.ready()
  173. for subtask in self.itersubtasks())
  174. def completed_count(self):
  175. """Task completion count.
  176. :returns: the number of tasks completed.
  177. """
  178. return sum(imap(int, (subtask.successful()
  179. for subtask in self.itersubtasks())))
  180. def forget(self):
  181. """Forget about (and possible remove the result of) all the tasks
  182. in this taskset."""
  183. for subtask in self.subtasks:
  184. subtask.forget()
  185. def revoke(self, connection=None, connect_timeout=None):
  186. """Revoke all subtasks."""
  187. def _do_revoke(connection=None, connect_timeout=None):
  188. for subtask in self.subtasks:
  189. subtask.revoke(connection=connection)
  190. return self.app.with_default_connection(_do_revoke)(
  191. connection=connection, connect_timeout=connect_timeout)
  192. def __iter__(self):
  193. """`iter(res)` -> `res.iterate()`."""
  194. return self.iterate()
  195. def __getitem__(self, index):
  196. """`res[i] -> res.subtasks[i]`"""
  197. return self.subtasks[index]
  198. def iterate(self):
  199. """Iterate over the return values of the tasks as they finish
  200. one by one.
  201. :raises: The exception if any of the tasks raised an exception.
  202. """
  203. pending = list(self.subtasks)
  204. results = dict((subtask.task_id, copy(subtask))
  205. for subtask in self.subtasks)
  206. while pending:
  207. for task_id in pending:
  208. result = results[task_id]
  209. if result.status == states.SUCCESS:
  210. try:
  211. pending.remove(task_id)
  212. except ValueError:
  213. pass
  214. yield result.result
  215. elif result.status in states.PROPAGATE_STATES:
  216. raise result.result
  217. def join(self, timeout=None, propagate=True):
  218. """Gather the results of all tasks in the taskset,
  219. and returns a list ordered by the order of the set.
  220. :keyword timeout: The number of seconds to wait for results before
  221. the operation times out.
  222. :keyword propagate: If any of the subtasks raises an exception, the
  223. exception will be reraised.
  224. :raises celery.exceptions.TimeoutError: if `timeout` is not
  225. :const:`None` and the operation takes longer than `timeout`
  226. seconds.
  227. """
  228. time_start = time.time()
  229. results = PositionQueue(length=self.total)
  230. while True:
  231. for position, pending_result in enumerate(self.subtasks):
  232. state = pending_result.state
  233. if state in states.READY_STATES:
  234. if propagate and state in states.PROPAGATE_STATES:
  235. raise pending_result.result
  236. results[position] = pending_result.result
  237. if results.full():
  238. # Make list copy, so the returned type is not a position
  239. # queue.
  240. return list(results)
  241. else:
  242. if (timeout is not None and
  243. time.time() >= time_start + timeout):
  244. raise TimeoutError("join operation timed out.")
  245. def save(self, backend=None):
  246. """Save taskset result for later retrieval using :meth:`restore`.
  247. Example::
  248. >>> result.save()
  249. >>> result = TaskSetResult.restore(task_id)
  250. """
  251. if backend is None:
  252. backend = self.app.backend
  253. backend.save_taskset(self.taskset_id, self)
  254. @classmethod
  255. def restore(self, taskset_id, backend=None):
  256. """Restore previously saved taskset result."""
  257. if backend is None:
  258. backend = self.app.backend
  259. return backend.restore_taskset(taskset_id)
  260. @property
  261. def total(self):
  262. """Total number of subtasks in the set."""
  263. return len(self.subtasks)
  264. class EagerResult(BaseAsyncResult):
  265. """Result that we know has already been executed."""
  266. TimeoutError = TimeoutError
  267. def __init__(self, task_id, ret_value, status, traceback=None):
  268. self.task_id = task_id
  269. self._result = ret_value
  270. self._status = status
  271. self._traceback = traceback
  272. def successful(self):
  273. """Returns :const:`True` if the task executed without failure."""
  274. return self.status == states.SUCCESS
  275. def ready(self):
  276. """Returns :const:`True` if the task has been executed."""
  277. return True
  278. def wait(self, timeout=None):
  279. """Wait until the task has been executed and return its result."""
  280. if self.status == states.SUCCESS:
  281. return self.result
  282. elif self.status in states.PROPAGATE_STATES:
  283. raise self.result
  284. def revoke(self):
  285. self._status = states.REVOKED
  286. @property
  287. def result(self):
  288. """The tasks return value"""
  289. return self._result
  290. @property
  291. def status(self):
  292. """The tasks status (alias to :attr:`state`)."""
  293. return self._status
  294. @property
  295. def state(self):
  296. """The tasks state."""
  297. return self._state
  298. @property
  299. def traceback(self):
  300. """The traceback if the task failed."""
  301. return self._traceback
  302. def __repr__(self):
  303. return "<EagerResult: %s>" % self.task_id