result.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import states
  6. from celery.app import app_or_default
  7. from celery.exceptions import TimeoutError
  8. from celery.registry import _unpickle_task
  9. from celery.utils.compat import any, all
  10. def _unpickle_result(task_id, task_name):
  11. return _unpickle_task(task_name).AsyncResult(task_id)
  12. class BaseAsyncResult(object):
  13. """Base class for pending result, supports custom task result backend.
  14. :param task_id: see :attr:`task_id`.
  15. :param backend: see :attr:`backend`.
  16. """
  17. #: Error raised for timeouts.
  18. TimeoutError = TimeoutError
  19. #: The task uuid.
  20. task_id = None
  21. #: The task result backend to use.
  22. backend = None
  23. def __init__(self, task_id, backend, task_name=None, app=None):
  24. self.task_id = task_id
  25. self.backend = backend
  26. self.task_name = task_name
  27. self.app = app_or_default(app)
  28. def __reduce__(self):
  29. if self.task_name:
  30. return (_unpickle_result, (self.task_id, self.task_name))
  31. else:
  32. return (self.__class__, (self.task_id, self.backend,
  33. None, self.app))
  34. def forget(self):
  35. """Forget about (and possibly remove the result of) this task."""
  36. self.backend.forget(self.task_id)
  37. def revoke(self, connection=None, connect_timeout=None):
  38. """Send revoke signal to all workers.
  39. Any worker receiving the task, or having reserved the
  40. task, *must* ignore it.
  41. """
  42. self.app.control.revoke(self.task_id, connection=connection,
  43. connect_timeout=connect_timeout)
  44. def wait(self, timeout=None, propagate=True, interval=0.5):
  45. """Wait for task, and return the result.
  46. .. warning::
  47. Waiting for subtasks may lead to deadlocks.
  48. Please read :ref:`task-synchronous-subtasks`.
  49. :keyword timeout: How long to wait, in seconds, before the
  50. operation times out.
  51. :keyword propagate: Re-raise exception if the task failed.
  52. :keyword interval: Time to wait (in seconds) before retrying to
  53. retrieve the result. Note that this does not have any effect
  54. when using the AMQP result store backend, as it does not
  55. use polling.
  56. :raises celery.exceptions.TimeoutError: if `timeout` is not
  57. :const:`None` and the result does not arrive within `timeout`
  58. seconds.
  59. If the remote call raised an exception then that exception will
  60. be re-raised.
  61. """
  62. return self.backend.wait_for(self.task_id, timeout=timeout,
  63. propagate=propagate,
  64. interval=interval)
  65. def get(self, timeout=None):
  66. """Alias to :meth:`wait`."""
  67. return self.wait(timeout=timeout)
  68. def ready(self):
  69. """Returns :const:`True` if the task has been executed.
  70. If the task is still running, pending, or is waiting
  71. for retry then :const:`False` is returned.
  72. """
  73. return self.status in self.backend.READY_STATES
  74. def successful(self):
  75. """Returns :const:`True` if the task executed successfully."""
  76. return self.status == states.SUCCESS
  77. def failed(self):
  78. """Returns :const:`True` if the task failed by exception."""
  79. return self.status == states.FAILURE
  80. def __str__(self):
  81. """`str(self) -> self.task_id`"""
  82. return self.task_id
  83. def __hash__(self):
  84. """`hash(self) -> hash(self.task_id)`"""
  85. return hash(self.task_id)
  86. def __repr__(self):
  87. return "<AsyncResult: %s>" % self.task_id
  88. def __eq__(self, other):
  89. if isinstance(other, self.__class__):
  90. return self.task_id == other.task_id
  91. return other == self.task_id
  92. def __copy__(self):
  93. return self.__class__(self.task_id, backend=self.backend)
  94. @property
  95. def result(self):
  96. """When the task has been executed, this contains the return value.
  97. If the task raised an exception, this will be the exception
  98. instance."""
  99. return self.backend.get_result(self.task_id)
  100. @property
  101. def info(self):
  102. """Get state metadata. Alias to :meth:`result`."""
  103. return self.result
  104. @property
  105. def traceback(self):
  106. """Get the traceback of a failed task."""
  107. return self.backend.get_traceback(self.task_id)
  108. @property
  109. def status(self):
  110. """Deprecated alias of :attr:`state`."""
  111. return self.state
  112. @property
  113. def state(self):
  114. """The tasks current state.
  115. Possible values includes:
  116. *PENDING*
  117. The task is waiting for execution.
  118. *STARTED*
  119. The task has been started.
  120. *RETRY*
  121. The task is to be retried, possibly because of failure.
  122. *FAILURE*
  123. The task raised an exception, or has been retried more times
  124. than its limit. The :attr:`result` attribute contains the
  125. exception raised.
  126. *SUCCESS*
  127. The task executed successfully. The :attr:`result` attribute
  128. contains the resulting value.
  129. """
  130. return self.backend.get_status(self.task_id)
  131. class AsyncResult(BaseAsyncResult):
  132. """Pending task result using the default backend.
  133. :param task_id: The tasks uuid.
  134. """
  135. #: The tasks uuid.
  136. uuid = None
  137. #: Task result store backend to use.
  138. backend = None
  139. def __init__(self, task_id, backend=None, task_name=None, app=None):
  140. app = app_or_default(app)
  141. backend = backend or app.backend
  142. super(AsyncResult, self).__init__(task_id, backend,
  143. task_name=task_name, app=app)
  144. class TaskSetResult(object):
  145. """Working with :class:`~celery.task.sets.TaskSet` results.
  146. An instance of this class is returned by
  147. `TaskSet`'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  148. inspection of the subtasks state and return values as a single entity.
  149. :param taskset_id: The id of the taskset.
  150. :param subtasks: List of result instances.
  151. """
  152. #: The UUID of the taskset.
  153. taskset_id = None
  154. #: A list of :class:`AsyncResult` instances for all of the subtasks.
  155. subtasks = None
  156. def __init__(self, taskset_id, subtasks, app=None):
  157. self.taskset_id = taskset_id
  158. self.subtasks = subtasks
  159. self.app = app_or_default(app)
  160. def itersubtasks(self):
  161. """Taskset subtask iterator.
  162. :returns: an iterator for iterating over the tasksets
  163. :class:`AsyncResult` objects.
  164. """
  165. return (subtask for subtask in self.subtasks)
  166. def successful(self):
  167. """Was the taskset successful?
  168. :returns: :const:`True` if all of the tasks in the taskset finished
  169. successfully (i.e. did not raise an exception).
  170. """
  171. return all(subtask.successful()
  172. for subtask in self.itersubtasks())
  173. def failed(self):
  174. """Did the taskset fail?
  175. :returns: :const:`True` if any of the tasks in the taskset failed.
  176. (i.e., raised an exception)
  177. """
  178. return any(subtask.failed()
  179. for subtask in self.itersubtasks())
  180. def waiting(self):
  181. """Is the taskset waiting?
  182. :returns: :const:`True` if any of the tasks in the taskset is still
  183. waiting for execution.
  184. """
  185. return any(not subtask.ready()
  186. for subtask in self.itersubtasks())
  187. def ready(self):
  188. """Is the task ready?
  189. :returns: :const:`True` if all of the tasks in the taskset has been
  190. executed.
  191. """
  192. return all(subtask.ready()
  193. for subtask in self.itersubtasks())
  194. def completed_count(self):
  195. """Task completion count.
  196. :returns: the number of tasks completed.
  197. """
  198. return sum(imap(int, (subtask.successful()
  199. for subtask in self.itersubtasks())))
  200. def forget(self):
  201. """Forget about (and possible remove the result of) all the tasks
  202. in this taskset."""
  203. for subtask in self.subtasks:
  204. subtask.forget()
  205. def revoke(self, connection=None, connect_timeout=None):
  206. """Revoke all subtasks."""
  207. def _do_revoke(connection=None, connect_timeout=None):
  208. for subtask in self.subtasks:
  209. subtask.revoke(connection=connection)
  210. return self.app.with_default_connection(_do_revoke)(
  211. connection=connection, connect_timeout=connect_timeout)
  212. def __iter__(self):
  213. """`iter(res)` -> `res.iterate()`."""
  214. return self.iterate()
  215. def __getitem__(self, index):
  216. """`res[i] -> res.subtasks[i]`"""
  217. return self.subtasks[index]
  218. def iterate(self):
  219. """Iterate over the return values of the tasks as they finish
  220. one by one.
  221. :raises: The exception if any of the tasks raised an exception.
  222. """
  223. pending = list(self.subtasks)
  224. results = dict((subtask.task_id, copy(subtask))
  225. for subtask in self.subtasks)
  226. while pending:
  227. for task_id in pending:
  228. result = results[task_id]
  229. if result.status == states.SUCCESS:
  230. try:
  231. pending.remove(task_id)
  232. except ValueError:
  233. pass
  234. yield result.result
  235. elif result.status in states.PROPAGATE_STATES:
  236. raise result.result
  237. def join(self, timeout=None, propagate=True, interval=0.5):
  238. """Gather the results of all tasks in the taskset,
  239. and returns a list ordered by the order of the set.
  240. .. note::
  241. This can be an very expensive operation on result store
  242. backends that must resort to polling (e.g. database).
  243. You should consider using :meth:`join_native` if your backends
  244. supports it.
  245. .. warning::
  246. Waiting for subtasks may lead the deadlocks.
  247. Please see :ref:`task-synchronous-subtasks`.
  248. :keyword timeout: The number of seconds to wait for results before
  249. the operation times out.
  250. :keyword propagate: If any of the subtasks raises an exception, the
  251. exception will be reraised.
  252. :keyword interval: Time to wait (in seconds) before retrying to
  253. retrieve a result from the set. Note that this
  254. does not have any effect when using the AMQP
  255. result store backend, as it does not use polling.
  256. :raises celery.exceptions.TimeoutError: if `timeout` is not
  257. :const:`None` and the operation takes longer than `timeout`
  258. seconds.
  259. """
  260. time_start = time.time()
  261. remaining = None
  262. results = []
  263. for subtask in self.subtasks:
  264. remaining = None
  265. if timeout:
  266. remaining = timeout - (time.time() - time_start)
  267. if remaining <= 0.0:
  268. raise TimeoutError("join operation timed out")
  269. results.append(subtask.wait(timeout=remaining,
  270. propagate=propagate,
  271. interval=interval))
  272. return results
  273. def iter_native(self, timeout=None):
  274. backend = self.subtasks[0].backend
  275. ids = [subtask.task_id for subtask in self.subtasks]
  276. return backend.get_many(ids, timeout=timeout)
  277. def join_native(self, timeout=None, propagate=True):
  278. """Backend optimized version of :meth:`join`.
  279. .. versionadded:: 2.2
  280. Note that this does not support collecting the results
  281. for different task types using different backends.
  282. This is currently only supported by the AMQP result backend.
  283. """
  284. backend = self.subtasks[0].backend
  285. results = [None for _ in xrange(len(self.subtasks))]
  286. ids = [subtask.task_id for subtask in self.subtasks]
  287. states = dict(backend.get_many(ids, timeout=timeout))
  288. for task_id, meta in states.items():
  289. index = self.subtasks.index(task_id)
  290. results[index] = meta["result"]
  291. return list(results)
  292. def save(self, backend=None):
  293. """Save taskset result for later retrieval using :meth:`restore`.
  294. Example::
  295. >>> result.save()
  296. >>> result = TaskSetResult.restore(taskset_id)
  297. """
  298. if backend is None:
  299. backend = self.app.backend
  300. backend.save_taskset(self.taskset_id, self)
  301. @classmethod
  302. def restore(self, taskset_id, backend=None):
  303. """Restore previously saved taskset result."""
  304. if backend is None:
  305. backend = app_or_default().backend
  306. return backend.restore_taskset(taskset_id)
  307. @property
  308. def total(self):
  309. """Total number of subtasks in the set."""
  310. return len(self.subtasks)
  311. class EagerResult(BaseAsyncResult):
  312. """Result that we know has already been executed."""
  313. TimeoutError = TimeoutError
  314. def __init__(self, task_id, ret_value, state, traceback=None):
  315. self.task_id = task_id
  316. self._result = ret_value
  317. self._state = state
  318. self._traceback = traceback
  319. def __reduce__(self):
  320. return (self.__class__, (self.task_id, self._result,
  321. self._state, self._traceback))
  322. def __copy__(self):
  323. return apply(*self.__reduce__())
  324. def successful(self):
  325. """Returns :const:`True` if the task executed without failure."""
  326. return self.state == states.SUCCESS
  327. def ready(self):
  328. """Returns :const:`True` if the task has been executed."""
  329. return True
  330. def wait(self, timeout=None, propagate=True, **kwargs):
  331. """Wait until the task has been executed and return its result."""
  332. if self.state == states.SUCCESS:
  333. return self.result
  334. elif self.state in states.PROPAGATE_STATES:
  335. if propagate:
  336. raise self.result
  337. return self.result
  338. def revoke(self):
  339. self._state = states.REVOKED
  340. def __repr__(self):
  341. return "<EagerResult: %s>" % self.task_id
  342. @property
  343. def result(self):
  344. """The tasks return value"""
  345. return self._result
  346. @property
  347. def state(self):
  348. """The tasks state."""
  349. return self._state
  350. @property
  351. def traceback(self):
  352. """The traceback if the task failed."""
  353. return self._traceback
  354. @property
  355. def status(self):
  356. """The tasks status (alias to :attr:`state`)."""
  357. return self._state