result.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. from __future__ import generators
  2. import time
  3. from copy import copy
  4. from itertools import imap
  5. from celery import current_app
  6. from celery import states
  7. from celery.app import app_or_default
  8. from celery.exceptions import TimeoutError
  9. from celery.registry import _unpickle_task
  10. from celery.utils.compat import any, all
  11. def _unpickle_result(task_id, task_name):
  12. return _unpickle_task(task_name).AsyncResult(task_id)
  13. class BaseAsyncResult(object):
  14. """Base class for pending result, supports custom task result backend.
  15. :param task_id: see :attr:`task_id`.
  16. :param backend: see :attr:`backend`.
  17. """
  18. #: Error raised for timeouts.
  19. TimeoutError = TimeoutError
  20. #: The task uuid.
  21. task_id = None
  22. #: The task result backend to use.
  23. backend = None
  24. def __init__(self, task_id, backend, task_name=None, app=None):
  25. self.task_id = task_id
  26. self.backend = backend
  27. self.task_name = task_name
  28. self.app = app_or_default(app)
  29. def __reduce__(self):
  30. if self.task_name:
  31. return (_unpickle_result, (self.task_id, self.task_name))
  32. else:
  33. return (self.__class__, (self.task_id, self.backend,
  34. None, self.app))
  35. def forget(self):
  36. """Forget about (and possibly remove the result of) this task."""
  37. self.backend.forget(self.task_id)
  38. def revoke(self, connection=None, connect_timeout=None):
  39. """Send revoke signal to all workers.
  40. Any worker receiving the task, or having reserved the
  41. task, *must* ignore it.
  42. """
  43. self.app.control.revoke(self.task_id, connection=connection,
  44. connect_timeout=connect_timeout)
  45. def wait(self, timeout=None, propagate=True, interval=0.5):
  46. """Wait for task, and return the result.
  47. .. warning::
  48. Waiting for subtasks may lead to deadlocks.
  49. Please read :ref:`task-synchronous-subtasks`.
  50. :keyword timeout: How long to wait, in seconds, before the
  51. operation times out.
  52. :keyword propagate: Re-raise exception if the task failed.
  53. :keyword interval: Time to wait (in seconds) before retrying to
  54. retrieve the result. Note that this does not have any effect
  55. when using the AMQP result store backend, as it does not
  56. use polling.
  57. :raises celery.exceptions.TimeoutError: if `timeout` is not
  58. :const:`None` and the result does not arrive within `timeout`
  59. seconds.
  60. If the remote call raised an exception then that exception will
  61. be re-raised.
  62. """
  63. return self.backend.wait_for(self.task_id, timeout=timeout,
  64. propagate=propagate,
  65. interval=interval)
  66. def get(self, timeout=None):
  67. """Alias to :meth:`wait`."""
  68. return self.wait(timeout=timeout)
  69. def ready(self):
  70. """Returns :const:`True` if the task has been executed.
  71. If the task is still running, pending, or is waiting
  72. for retry then :const:`False` is returned.
  73. """
  74. return self.status in self.backend.READY_STATES
  75. def successful(self):
  76. """Returns :const:`True` if the task executed successfully."""
  77. return self.status == states.SUCCESS
  78. def failed(self):
  79. """Returns :const:`True` if the task failed by exception."""
  80. return self.status == states.FAILURE
  81. def __str__(self):
  82. """`str(self) -> self.task_id`"""
  83. return self.task_id
  84. def __hash__(self):
  85. """`hash(self) -> hash(self.task_id)`"""
  86. return hash(self.task_id)
  87. def __repr__(self):
  88. return "<AsyncResult: %s>" % self.task_id
  89. def __eq__(self, other):
  90. if isinstance(other, self.__class__):
  91. return self.task_id == other.task_id
  92. return other == self.task_id
  93. def __copy__(self):
  94. return self.__class__(self.task_id, backend=self.backend)
  95. @property
  96. def result(self):
  97. """When the task has been executed, this contains the return value.
  98. If the task raised an exception, this will be the exception
  99. instance."""
  100. return self.backend.get_result(self.task_id)
  101. @property
  102. def info(self):
  103. """Get state metadata. Alias to :meth:`result`."""
  104. return self.result
  105. @property
  106. def traceback(self):
  107. """Get the traceback of a failed task."""
  108. return self.backend.get_traceback(self.task_id)
  109. @property
  110. def status(self):
  111. """Deprecated alias of :attr:`state`."""
  112. return self.state
  113. @property
  114. def state(self):
  115. """The tasks current state.
  116. Possible values includes:
  117. *PENDING*
  118. The task is waiting for execution.
  119. *STARTED*
  120. The task has been started.
  121. *RETRY*
  122. The task is to be retried, possibly because of failure.
  123. *FAILURE*
  124. The task raised an exception, or has been retried more times
  125. than its limit. The :attr:`result` attribute contains the
  126. exception raised.
  127. *SUCCESS*
  128. The task executed successfully. The :attr:`result` attribute
  129. contains the resulting value.
  130. """
  131. return self.backend.get_status(self.task_id)
  132. class AsyncResult(BaseAsyncResult):
  133. """Pending task result using the default backend.
  134. :param task_id: The tasks uuid.
  135. """
  136. #: The tasks uuid.
  137. uuid = None
  138. #: Task result store backend to use.
  139. backend = None
  140. def __init__(self, task_id, backend=None, task_name=None, app=None):
  141. app = app_or_default(app)
  142. backend = backend or app.backend
  143. super(AsyncResult, self).__init__(task_id, backend,
  144. task_name=task_name, app=app)
  145. class TaskSetResult(object):
  146. """Working with :class:`~celery.task.sets.TaskSet` results.
  147. An instance of this class is returned by
  148. `TaskSet`'s :meth:`~celery.task.TaskSet.apply_async()`. It enables
  149. inspection of the subtasks state and return values as a single entity.
  150. :param taskset_id: The id of the taskset.
  151. :param subtasks: List of result instances.
  152. """
  153. #: The UUID of the taskset.
  154. taskset_id = None
  155. #: A list of :class:`AsyncResult` instances for all of the subtasks.
  156. subtasks = None
  157. def __init__(self, taskset_id, subtasks, app=None):
  158. self.taskset_id = taskset_id
  159. self.subtasks = subtasks
  160. self.app = app_or_default(app)
  161. def itersubtasks(self):
  162. """Taskset subtask iterator.
  163. :returns: an iterator for iterating over the tasksets
  164. :class:`AsyncResult` objects.
  165. """
  166. return (subtask for subtask in self.subtasks)
  167. def successful(self):
  168. """Was the taskset successful?
  169. :returns: :const:`True` if all of the tasks in the taskset finished
  170. successfully (i.e. did not raise an exception).
  171. """
  172. return all(subtask.successful()
  173. for subtask in self.itersubtasks())
  174. def failed(self):
  175. """Did the taskset fail?
  176. :returns: :const:`True` if any of the tasks in the taskset failed.
  177. (i.e., raised an exception)
  178. """
  179. return any(subtask.failed()
  180. for subtask in self.itersubtasks())
  181. def waiting(self):
  182. """Is the taskset waiting?
  183. :returns: :const:`True` if any of the tasks in the taskset is still
  184. waiting for execution.
  185. """
  186. return any(not subtask.ready()
  187. for subtask in self.itersubtasks())
  188. def ready(self):
  189. """Is the task ready?
  190. :returns: :const:`True` if all of the tasks in the taskset has been
  191. executed.
  192. """
  193. return all(subtask.ready()
  194. for subtask in self.itersubtasks())
  195. def completed_count(self):
  196. """Task completion count.
  197. :returns: the number of tasks completed.
  198. """
  199. return sum(imap(int, (subtask.successful()
  200. for subtask in self.itersubtasks())))
  201. def forget(self):
  202. """Forget about (and possible remove the result of) all the tasks
  203. in this taskset."""
  204. for subtask in self.subtasks:
  205. subtask.forget()
  206. def revoke(self, connection=None, connect_timeout=None):
  207. """Revoke all subtasks."""
  208. def _do_revoke(connection=None, connect_timeout=None):
  209. for subtask in self.subtasks:
  210. subtask.revoke(connection=connection)
  211. return self.app.with_default_connection(_do_revoke)(
  212. connection=connection, connect_timeout=connect_timeout)
  213. def __iter__(self):
  214. """`iter(res)` -> `res.iterate()`."""
  215. return self.iterate()
  216. def __getitem__(self, index):
  217. """`res[i] -> res.subtasks[i]`"""
  218. return self.subtasks[index]
  219. def iterate(self):
  220. """Iterate over the return values of the tasks as they finish
  221. one by one.
  222. :raises: The exception if any of the tasks raised an exception.
  223. """
  224. pending = list(self.subtasks)
  225. results = dict((subtask.task_id, copy(subtask))
  226. for subtask in self.subtasks)
  227. while pending:
  228. for task_id in pending:
  229. result = results[task_id]
  230. if result.status == states.SUCCESS:
  231. try:
  232. pending.remove(task_id)
  233. except ValueError:
  234. pass
  235. yield result.result
  236. elif result.status in states.PROPAGATE_STATES:
  237. raise result.result
  238. def join(self, timeout=None, propagate=True, interval=0.5):
  239. """Gather the results of all tasks in the taskset,
  240. and returns a list ordered by the order of the set.
  241. .. note::
  242. This can be an very expensive operation on result store
  243. backends that must resort to polling (e.g. database).
  244. You should consider using :meth:`join_native` if your backends
  245. supports it.
  246. .. warning::
  247. Waiting for subtasks may lead the deadlocks.
  248. Please see :ref:`task-synchronous-subtasks`.
  249. :keyword timeout: The number of seconds to wait for results before
  250. the operation times out.
  251. :keyword propagate: If any of the subtasks raises an exception, the
  252. exception will be reraised.
  253. :keyword interval: Time to wait (in seconds) before retrying to
  254. retrieve a result from the set. Note that this
  255. does not have any effect when using the AMQP
  256. result store backend, as it does not use polling.
  257. :raises celery.exceptions.TimeoutError: if `timeout` is not
  258. :const:`None` and the operation takes longer than `timeout`
  259. seconds.
  260. """
  261. time_start = time.time()
  262. remaining = None
  263. results = []
  264. for subtask in self.subtasks:
  265. remaining = None
  266. if timeout:
  267. remaining = timeout - (time.time() - time_start)
  268. if remaining <= 0.0:
  269. raise TimeoutError("join operation timed out")
  270. results.append(subtask.wait(timeout=remaining,
  271. propagate=propagate,
  272. interval=interval))
  273. return results
  274. def iter_native(self, timeout=None):
  275. backend = self.subtasks[0].backend
  276. ids = [subtask.task_id for subtask in self.subtasks]
  277. return backend.get_many(ids, timeout=timeout)
  278. def join_native(self, timeout=None, propagate=True):
  279. """Backend optimized version of :meth:`join`.
  280. .. versionadded:: 2.2
  281. Note that this does not support collecting the results
  282. for different task types using different backends.
  283. This is currently only supported by the AMQP result backend.
  284. """
  285. backend = self.subtasks[0].backend
  286. results = [None for _ in xrange(len(self.subtasks))]
  287. ids = [subtask.task_id for subtask in self.subtasks]
  288. states = dict(backend.get_many(ids, timeout=timeout))
  289. for task_id, meta in states.items():
  290. index = self.subtasks.index(task_id)
  291. results[index] = meta["result"]
  292. return list(results)
  293. def save(self, backend=None):
  294. """Save taskset result for later retrieval using :meth:`restore`.
  295. Example::
  296. >>> result.save()
  297. >>> result = TaskSetResult.restore(taskset_id)
  298. """
  299. if backend is None:
  300. backend = self.app.backend
  301. backend.save_taskset(self.taskset_id, self)
  302. @classmethod
  303. def restore(self, taskset_id, backend=None):
  304. """Restore previously saved taskset result."""
  305. if backend is None:
  306. backend = current_app.backend
  307. return backend.restore_taskset(taskset_id)
  308. @property
  309. def total(self):
  310. """Total number of subtasks in the set."""
  311. return len(self.subtasks)
  312. class EagerResult(BaseAsyncResult):
  313. """Result that we know has already been executed."""
  314. TimeoutError = TimeoutError
  315. def __init__(self, task_id, ret_value, state, traceback=None):
  316. self.task_id = task_id
  317. self._result = ret_value
  318. self._state = state
  319. self._traceback = traceback
  320. def __reduce__(self):
  321. return (self.__class__, (self.task_id, self._result,
  322. self._state, self._traceback))
  323. def __copy__(self):
  324. cls, args = self.__reduce__()
  325. return cls(*args)
  326. def successful(self):
  327. """Returns :const:`True` if the task executed without failure."""
  328. return self.state == states.SUCCESS
  329. def ready(self):
  330. """Returns :const:`True` if the task has been executed."""
  331. return True
  332. def wait(self, timeout=None, propagate=True, **kwargs):
  333. """Wait until the task has been executed and return its result."""
  334. if self.state == states.SUCCESS:
  335. return self.result
  336. elif self.state in states.PROPAGATE_STATES:
  337. if propagate:
  338. raise self.result
  339. return self.result
  340. def revoke(self):
  341. self._state = states.REVOKED
  342. def __repr__(self):
  343. return "<EagerResult: %s>" % self.task_id
  344. @property
  345. def result(self):
  346. """The tasks return value"""
  347. return self._result
  348. @property
  349. def state(self):
  350. """The tasks state."""
  351. return self._state
  352. @property
  353. def traceback(self):
  354. """The traceback if the task failed."""
  355. return self._traceback
  356. @property
  357. def status(self):
  358. """The tasks status (alias to :attr:`state`)."""
  359. return self._state