base.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. """celery.backends.base"""
  2. import time
  3. from celery import states
  4. from celery.exceptions import TimeoutError, TaskRevokedError
  5. from celery.utils.serialization import pickle, get_pickled_exception
  6. from celery.utils.serialization import get_pickleable_exception
  7. from celery.datastructures import LocalCache
  8. class BaseBackend(object):
  9. """The base backend class. All backends should inherit from this."""
  10. READY_STATES = states.READY_STATES
  11. UNREADY_STATES = states.UNREADY_STATES
  12. EXCEPTION_STATES = states.EXCEPTION_STATES
  13. TimeoutError = TimeoutError
  14. def __init__(self, *args, **kwargs):
  15. from celery.app import app_or_default
  16. self.app = app_or_default(kwargs.get("app"))
  17. def encode_result(self, result, status):
  18. if status in self.EXCEPTION_STATES:
  19. return self.prepare_exception(result)
  20. else:
  21. return self.prepare_value(result)
  22. def store_result(self, task_id, result, status):
  23. """Store the result and status of a task."""
  24. raise NotImplementedError(
  25. "store_result is not supported by this backend.")
  26. def mark_as_started(self, task_id, **meta):
  27. """Mark a task as started"""
  28. return self.store_result(task_id, meta, status=states.STARTED)
  29. def mark_as_done(self, task_id, result):
  30. """Mark task as successfully executed."""
  31. return self.store_result(task_id, result, status=states.SUCCESS)
  32. def mark_as_failure(self, task_id, exc, traceback=None):
  33. """Mark task as executed with failure. Stores the execption."""
  34. return self.store_result(task_id, exc, status=states.FAILURE,
  35. traceback=traceback)
  36. def mark_as_retry(self, task_id, exc, traceback=None):
  37. """Mark task as being retries. Stores the current
  38. exception (if any)."""
  39. return self.store_result(task_id, exc, status=states.RETRY,
  40. traceback=traceback)
  41. def mark_as_revoked(self, task_id):
  42. return self.store_result(task_id, TaskRevokedError(),
  43. status=states.REVOKED, traceback=None)
  44. def prepare_exception(self, exc):
  45. """Prepare exception for serialization."""
  46. return get_pickleable_exception(exc)
  47. def exception_to_python(self, exc):
  48. """Convert serialized exception to Python exception."""
  49. return get_pickled_exception(exc)
  50. def prepare_value(self, result):
  51. """Prepare value for storage."""
  52. return result
  53. def forget(self, task_id):
  54. raise NotImplementedError("%s does not implement forget." % (
  55. self.__class__))
  56. def wait_for(self, task_id, timeout=None):
  57. """Wait for task and return its result.
  58. If the task raises an exception, this exception
  59. will be re-raised by :func:`wait_for`.
  60. If `timeout` is not :const:`None`, this raises the
  61. :class:`celery.exceptions.TimeoutError` exception if the operation
  62. takes longer than `timeout` seconds.
  63. """
  64. sleep_inbetween = 0.5
  65. time_elapsed = 0.0
  66. while True:
  67. status = self.get_status(task_id)
  68. if status == states.SUCCESS:
  69. return self.get_result(task_id)
  70. elif status in states.PROPAGATE_STATES:
  71. raise self.get_result(task_id)
  72. # avoid hammering the CPU checking status.
  73. time.sleep(sleep_inbetween)
  74. time_elapsed += sleep_inbetween
  75. if timeout and time_elapsed >= timeout:
  76. raise TimeoutError("The operation timed out.")
  77. def cleanup(self):
  78. """Backend cleanup. Is run by
  79. :class:`celery.task.DeleteExpiredTaskMetaTask`."""
  80. pass
  81. def process_cleanup(self):
  82. """Cleanup actions to do at the end of a task worker process."""
  83. pass
  84. def get_status(self, task_id):
  85. """Get the status of a task."""
  86. raise NotImplementedError(
  87. "get_status is not supported by this backend.")
  88. def get_result(self, task_id):
  89. """Get the result of a task."""
  90. raise NotImplementedError(
  91. "get_result is not supported by this backend.")
  92. def get_traceback(self, task_id):
  93. """Get the traceback for a failed task."""
  94. raise NotImplementedError(
  95. "get_traceback is not supported by this backend.")
  96. def save_taskset(self, taskset_id, result):
  97. """Store the result and status of a task."""
  98. raise NotImplementedError(
  99. "save_taskset is not supported by this backend.")
  100. def restore_taskset(self, taskset_id, cache=True):
  101. """Get the result of a taskset."""
  102. raise NotImplementedError(
  103. "restore_taskset is not supported by this backend.")
  104. def reload_task_result(self, task_id):
  105. """Reload task result, even if it has been previously fetched."""
  106. raise NotImplementedError(
  107. "reload_task_result is not supported by this backend.")
  108. def reload_taskset_result(self, task_id):
  109. """Reload taskset result, even if it has been previously fetched."""
  110. raise NotImplementedError(
  111. "reload_taskset_result is not supported by this backend.")
  112. class BaseDictBackend(BaseBackend):
  113. def __init__(self, *args, **kwargs):
  114. super(BaseDictBackend, self).__init__(*args, **kwargs)
  115. self._cache = LocalCache(limit=kwargs.get("max_cached_results") or
  116. self.app.conf.CELERY_MAX_CACHED_RESULTS)
  117. def store_result(self, task_id, result, status, traceback=None):
  118. """Store task result and status."""
  119. result = self.encode_result(result, status)
  120. return self._store_result(task_id, result, status, traceback)
  121. def forget(self, task_id):
  122. self._cache.pop(task_id, None)
  123. self._forget(task_id)
  124. def get_status(self, task_id):
  125. """Get the status of a task."""
  126. return self.get_task_meta(task_id)["status"]
  127. def get_traceback(self, task_id):
  128. """Get the traceback for a failed task."""
  129. return self.get_task_meta(task_id).get("traceback")
  130. def get_result(self, task_id):
  131. """Get the result of a task."""
  132. meta = self.get_task_meta(task_id)
  133. if meta["status"] in self.EXCEPTION_STATES:
  134. return self.exception_to_python(meta["result"])
  135. else:
  136. return meta["result"]
  137. def get_task_meta(self, task_id, cache=True):
  138. if cache and task_id in self._cache:
  139. return self._cache[task_id]
  140. meta = self._get_task_meta_for(task_id)
  141. if cache and meta.get("status") == states.SUCCESS:
  142. self._cache[task_id] = meta
  143. return meta
  144. def reload_task_result(self, task_id):
  145. self._cache[task_id] = self.get_task_meta(task_id, cache=False)
  146. def reload_taskset_result(self, taskset_id):
  147. self._cache[taskset_id] = self.get_taskset_meta(taskset_id,
  148. cache=False)
  149. def get_taskset_meta(self, taskset_id, cache=True):
  150. if cache and taskset_id in self._cache:
  151. return self._cache[taskset_id]
  152. meta = self._restore_taskset(taskset_id)
  153. if cache and meta is not None:
  154. self._cache[taskset_id] = meta
  155. return meta
  156. def restore_taskset(self, taskset_id, cache=True):
  157. """Get the result for a taskset."""
  158. meta = self.get_taskset_meta(taskset_id, cache=cache)
  159. if meta:
  160. return meta["result"]
  161. def save_taskset(self, taskset_id, result):
  162. """Store the result of an executed taskset."""
  163. return self._save_taskset(taskset_id, result)
  164. class KeyValueStoreBackend(BaseDictBackend):
  165. def get(self, key):
  166. raise NotImplementedError("Must implement the get method.")
  167. def set(self, key, value):
  168. raise NotImplementedError("Must implement the set method.")
  169. def delete(self, key):
  170. raise NotImplementedError("Must implement the delete method")
  171. def get_key_for_task(self, task_id):
  172. """Get the cache key for a task by id."""
  173. return "celery-task-meta-%s" % task_id
  174. def get_key_for_taskset(self, task_id):
  175. """Get the cache key for a task by id."""
  176. return "celery-taskset-meta-%s" % task_id
  177. def _forget(self, task_id):
  178. self.delete(self.get_key_for_task(task_id))
  179. def _store_result(self, task_id, result, status, traceback=None):
  180. meta = {"status": status, "result": result, "traceback": traceback}
  181. self.set(self.get_key_for_task(task_id), pickle.dumps(meta))
  182. return result
  183. def _save_taskset(self, taskset_id, result):
  184. meta = {"result": result}
  185. self.set(self.get_key_for_taskset(taskset_id), pickle.dumps(meta))
  186. return result
  187. def _get_task_meta_for(self, task_id):
  188. """Get task metadata for a task by id."""
  189. meta = self.get(self.get_key_for_task(task_id))
  190. if not meta:
  191. return {"status": states.PENDING, "result": None}
  192. return pickle.loads(str(meta))
  193. def _restore_taskset(self, taskset_id):
  194. """Get task metadata for a task by id."""
  195. meta = self.get(self.get_key_for_taskset(taskset_id))
  196. if meta:
  197. meta = pickle.loads(str(meta))
  198. return meta