result.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. from __future__ import generators
  2. """
  3. Asynchronous result types.
  4. """
  5. import time
  6. from itertools import imap
  7. from copy import copy
  8. from celery import states
  9. from celery.utils import any, all
  10. from celery.backends import default_backend
  11. from celery.messaging import with_connection
  12. from celery.exceptions import TimeoutError
  13. from celery.datastructures import PositionQueue
  14. class BaseAsyncResult(object):
  15. """Base class for pending result, supports custom task result backend.
  16. :param task_id: see :attr:`task_id`.
  17. :param backend: see :attr:`backend`.
  18. .. attribute:: task_id
  19. The unique identifier for this task.
  20. .. attribute:: backend
  21. The task result backend used.
  22. """
  23. TimeoutError = TimeoutError
  24. def __init__(self, task_id, backend):
  25. self.task_id = task_id
  26. self.backend = backend
  27. @with_connection
  28. def revoke(self, connection=None, connect_timeout=None):
  29. """Send revoke signal to all workers.
  30. The workers will ignore the task if received.
  31. """
  32. from celery.task import control
  33. control.revoke(self.task_id)
  34. def get(self, timeout=None):
  35. """Alias to :meth:`wait`."""
  36. return self.wait(timeout=timeout)
  37. def wait(self, timeout=None):
  38. """Wait for task, and return the result when it arrives.
  39. :keyword timeout: How long to wait, in seconds, before the
  40. operation times out.
  41. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  42. and the result does not arrive within ``timeout`` seconds.
  43. If the remote call raised an exception then that
  44. exception will be re-raised.
  45. """
  46. return self.backend.wait_for(self.task_id, timeout=timeout)
  47. def ready(self):
  48. """Returns ``True`` if the task executed successfully, or raised
  49. an exception. If the task is still running, pending, or is waiting
  50. for retry then ``False`` is returned.
  51. :rtype: bool
  52. """
  53. status = self.backend.get_status(self.task_id)
  54. return status not in self.backend.UNREADY_STATES
  55. def successful(self):
  56. """Returns ``True`` if the task executed successfully.
  57. :rtype: bool
  58. """
  59. return self.backend.is_successful(self.task_id)
  60. def __str__(self):
  61. """``str(self)`` -> ``self.task_id``"""
  62. return self.task_id
  63. def __hash__(self):
  64. return hash(self.task_id)
  65. def __repr__(self):
  66. return "<AsyncResult: %s>" % self.task_id
  67. def __eq__(self, other):
  68. if isinstance(other, self.__class__):
  69. return self.task_id == other.task_id
  70. return other == self.task_id
  71. def __copy__(self):
  72. return self.__class__(self.task_id, backend=self.backend)
  73. @property
  74. def result(self):
  75. """When the task has been executed, this contains the return value.
  76. If the task raised an exception, this will be the exception instance.
  77. """
  78. return self.backend.get_result(self.task_id)
  79. @property
  80. def traceback(self):
  81. """Get the traceback of a failed task."""
  82. return self.backend.get_traceback(self.task_id)
  83. @property
  84. def status(self):
  85. """The current status of the task.
  86. Can be one of the following:
  87. *PENDING*
  88. The task is waiting for execution.
  89. *STARTED*
  90. The task has been started.
  91. *RETRY*
  92. The task is to be retried, possibly because of failure.
  93. *FAILURE*
  94. The task raised an exception, or has been retried more times
  95. than its limit. The :attr:`result` attribute contains the
  96. exception raised.
  97. *SUCCESS*
  98. The task executed successfully. The :attr:`result` attribute
  99. contains the resulting value.
  100. """
  101. return self.backend.get_status(self.task_id)
  102. class AsyncResult(BaseAsyncResult):
  103. """Pending task result using the default backend.
  104. :param task_id: see :attr:`task_id`.
  105. .. attribute:: task_id
  106. The unique identifier for this task.
  107. .. attribute:: backend
  108. Instance of :class:`celery.backends.DefaultBackend`.
  109. """
  110. def __init__(self, task_id, backend=None):
  111. backend = backend or default_backend
  112. super(AsyncResult, self).__init__(task_id, backend)
  113. class TaskSetResult(object):
  114. """Working with :class:`celery.task.TaskSet` results.
  115. An instance of this class is returned by
  116. :meth:`celery.task.TaskSet.apply_async()`. It lets you inspect the
  117. status and return values of the taskset as a single entity.
  118. :option taskset_id: see :attr:`taskset_id`.
  119. :option subtasks: see :attr:`subtasks`.
  120. .. attribute:: taskset_id
  121. The UUID of the taskset itself.
  122. .. attribute:: subtasks
  123. A list of :class:`AsyncResult` instances for all of the subtasks.
  124. """
  125. def __init__(self, taskset_id, subtasks):
  126. self.taskset_id = taskset_id
  127. self.subtasks = subtasks
  128. def itersubtasks(self):
  129. """Taskset subtask iterator.
  130. :returns: an iterator for iterating over the tasksets
  131. :class:`AsyncResult` objects.
  132. """
  133. return (subtask for subtask in self.subtasks)
  134. def successful(self):
  135. """Was the taskset successful?
  136. :returns: ``True`` if all of the tasks in the taskset finished
  137. successfully (i.e. did not raise an exception).
  138. """
  139. return all((subtask.successful()
  140. for subtask in self.itersubtasks()))
  141. def failed(self):
  142. """Did the taskset fail?
  143. :returns: ``True`` if any of the tasks in the taskset failed.
  144. (i.e., raised an exception)
  145. """
  146. return any((not subtask.successful()
  147. for subtask in self.itersubtasks()))
  148. def waiting(self):
  149. """Is the taskset waiting?
  150. :returns: ``True`` if any of the tasks in the taskset is still
  151. waiting for execution.
  152. """
  153. return any((not subtask.ready()
  154. for subtask in self.itersubtasks()))
  155. def ready(self):
  156. """Is the task ready?
  157. :returns: ``True`` if all of the tasks in the taskset has been
  158. executed.
  159. """
  160. return all((subtask.ready()
  161. for subtask in self.itersubtasks()))
  162. def completed_count(self):
  163. """Task completion count.
  164. :returns: the number of tasks completed.
  165. """
  166. return sum(imap(int, (subtask.successful()
  167. for subtask in self.itersubtasks())))
  168. @with_connection
  169. def revoke(self, connection=None, connect_timeout=None):
  170. for subtask in self.subtasks:
  171. subtask.revoke(connection=connection)
  172. def __iter__(self):
  173. """``iter(res)`` -> ``res.iterate()``."""
  174. return self.iterate()
  175. def iterate(self):
  176. """Iterate over the return values of the tasks as they finish
  177. one by one.
  178. :raises: The exception if any of the tasks raised an exception.
  179. """
  180. pending = list(self.subtasks)
  181. results = dict((subtask.task_id, copy(subtask))
  182. for subtask in self.subtasks)
  183. while pending:
  184. for task_id in pending:
  185. result = results[task_id]
  186. if result.status == states.SUCCESS:
  187. try:
  188. pending.remove(task_id)
  189. except ValueError:
  190. pass
  191. yield result.result
  192. elif result.status in states.PROPAGATE_STATES:
  193. raise result.result
  194. def join(self, timeout=None):
  195. """Gather the results for all of the tasks in the taskset,
  196. and return a list with them ordered by the order of which they
  197. were called.
  198. :keyword timeout: The time in seconds, how long
  199. it will wait for results, before the operation times out.
  200. :raises celery.exceptions.TimeoutError: if ``timeout`` is not ``None``
  201. and the operation takes longer than ``timeout`` seconds.
  202. If any of the tasks raises an exception, the exception
  203. will be reraised by :meth:`join`.
  204. :returns: list of return values for all tasks in the taskset.
  205. """
  206. time_start = time.time()
  207. def on_timeout():
  208. raise TimeoutError("The operation timed out.")
  209. results = PositionQueue(length=self.total)
  210. while True:
  211. for position, pending_result in enumerate(self.subtasks):
  212. if pending_result.status == states.SUCCESS:
  213. results[position] = pending_result.result
  214. elif pending_result.status in states.PROPAGATE_STATES:
  215. raise pending_result.result
  216. if results.full():
  217. # Make list copy, so the returned type is not a position
  218. # queue.
  219. return list(results)
  220. else:
  221. if timeout is not None and \
  222. time.time() >= time_start + timeout:
  223. on_timeout()
  224. def save(self, backend=default_backend):
  225. """Save taskset result for later retrieval using :meth:`restore`.
  226. Example:
  227. >>> result.save()
  228. >>> result = TaskSetResult.restore(task_id)
  229. """
  230. backend.save_taskset(self.taskset_id, self)
  231. @classmethod
  232. def restore(self, taskset_id, backend=default_backend):
  233. """Restore previously saved taskset result."""
  234. return backend.restore_taskset(taskset_id)
  235. @property
  236. def total(self):
  237. """The total number of tasks in the :class:`celery.task.TaskSet`."""
  238. return len(self.subtasks)
  239. class EagerResult(BaseAsyncResult):
  240. """Result that we know has already been executed. """
  241. TimeoutError = TimeoutError
  242. def __init__(self, task_id, ret_value, status, traceback=None):
  243. self.task_id = task_id
  244. self._result = ret_value
  245. self._status = status
  246. self._traceback = traceback
  247. def successful(self):
  248. """Returns ``True`` if the task executed without failure."""
  249. return self.status == states.SUCCESS
  250. def ready(self):
  251. """Returns ``True`` if the task has been executed."""
  252. return True
  253. def wait(self, timeout=None):
  254. """Wait until the task has been executed and return its result."""
  255. if self.status == states.SUCCESS:
  256. return self.result
  257. elif self.status in states.PROPAGATE_STATES:
  258. raise self.result
  259. def revoke(self):
  260. pass
  261. @property
  262. def result(self):
  263. """The tasks return value"""
  264. return self._result
  265. @property
  266. def status(self):
  267. """The tasks status"""
  268. return self._status
  269. @property
  270. def traceback(self):
  271. """The traceback if the task failed."""
  272. return self._traceback
  273. def __repr__(self):
  274. return "<EagerResult: %s>" % self.task_id