base.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. """celery.backends.base"""
  2. import time
  3. import operator
  4. from celery.serialization import pickle
  5. from celery.serialization import get_pickled_exception
  6. from celery.serialization import get_pickleable_exception
  7. from celery.exceptions import TimeoutError
  8. class BaseBackend(object):
  9. """The base backend class. All backends should inherit from this."""
  10. capabilities = []
  11. TimeoutError = TimeoutError
  12. def store_result(self, task_id, result, status):
  13. """Store the result and status of a task."""
  14. raise NotImplementedError(
  15. "store_result is not supported by this backend.")
  16. def mark_as_done(self, task_id, result):
  17. """Mark task as successfully executed."""
  18. return self.store_result(task_id, result, status="DONE")
  19. def mark_as_failure(self, task_id, exc, traceback=None):
  20. """Mark task as executed with failure. Stores the execption."""
  21. return self.store_result(task_id, exc, status="FAILURE",
  22. traceback=traceback)
  23. def mark_as_retry(self, task_id, exc, traceback=None):
  24. """Mark task as being retries. Stores the current
  25. exception (if any)."""
  26. return self.store_result(task_id, exc, status="RETRY",
  27. traceback=traceback)
  28. def prepare_exception(self, exc):
  29. """Prepare exception for serialization."""
  30. return get_pickleable_exception(exc)
  31. def exception_to_python(self, exc):
  32. """Convert serialized exception to Python exception."""
  33. return get_pickled_exception(exc)
  34. def get_status(self, task_id):
  35. """Get the status of a task."""
  36. raise NotImplementedError(
  37. "get_status is not supported by this backend.")
  38. def prepare_result(self, result):
  39. """Prepare result for storage."""
  40. if result is None:
  41. return True
  42. return result
  43. def get_result(self, task_id):
  44. """Get the result of a task."""
  45. raise NotImplementedError(
  46. "get_result is not supported by this backend.")
  47. def get_traceback(self, task_id):
  48. """Get the traceback for a failed task."""
  49. raise NotImplementedError(
  50. "get_traceback is not supported by this backend.")
  51. def is_done(self, task_id):
  52. """Returns ``True`` if the task was successfully executed."""
  53. return self.get_status(task_id) == "DONE"
  54. def cleanup(self):
  55. """Backend cleanup. Is run by
  56. :class:`celery.task.DeleteExpiredTaskMetaTask`."""
  57. pass
  58. def wait_for(self, task_id, timeout=None):
  59. """Wait for task and return its result.
  60. If the task raises an exception, this exception
  61. will be re-raised by :func:`wait_for`.
  62. If ``timeout`` is not ``None``, this raises the
  63. :class:`celery.exceptions.TimeoutError` exception if the operation
  64. takes longer than ``timeout`` seconds.
  65. """
  66. sleep_inbetween = 0.5
  67. time_elapsed = 0.0
  68. while True:
  69. status = self.get_status(task_id)
  70. if status == "DONE":
  71. return self.get_result(task_id)
  72. elif status == "FAILURE":
  73. raise self.get_result(task_id)
  74. # avoid hammering the CPU checking status.
  75. time.sleep(sleep_inbetween)
  76. time_elapsed += sleep_inbetween
  77. if timeout and time_elapsed >= timeout:
  78. raise TimeoutError("The operation timed out.")
  79. def process_cleanup(self):
  80. """Cleanup actions to do at the end of a task worker process.
  81. See :func:`celery.worker.jail`.
  82. """
  83. pass
  84. class KeyValueStoreBackend(BaseBackend):
  85. capabilities = ["ResultStore"]
  86. def __init__(self, *args, **kwargs):
  87. super(KeyValueStoreBackend, self).__init__()
  88. self._cache = {}
  89. def get_cache_key_for_task(self, task_id):
  90. """Get the cache key for a task by id."""
  91. return "celery-task-meta-%s" % task_id
  92. def get(self, key):
  93. raise NotImplementedError("Must implement the get method.")
  94. def set(self, key, value):
  95. raise NotImplementedError("Must implement the set method.")
  96. def store_result(self, task_id, result, status, traceback=None):
  97. """Store task result and status."""
  98. if status == "DONE":
  99. result = self.prepare_result(result)
  100. elif status == "FAILURE":
  101. result = self.prepare_exception(result)
  102. meta = {"status": status, "result": result, "traceback": traceback}
  103. self.set(self.get_cache_key_for_task(task_id), pickle.dumps(meta))
  104. return result
  105. def get_status(self, task_id):
  106. """Get the status of a task."""
  107. return self._get_task_meta_for(task_id)["status"]
  108. def get_result(self, task_id):
  109. """Get the result of a task."""
  110. meta = self._get_task_meta_for(task_id)
  111. if meta["status"] == "FAILURE":
  112. return self.exception_to_python(meta["result"])
  113. else:
  114. return meta["result"]
  115. def get_traceback(self, task_id):
  116. """Get the traceback for a failed task."""
  117. meta = self._get_task_meta_for(task_id)
  118. return meta["traceback"]
  119. def is_done(self, task_id):
  120. """Returns ``True`` if the task executed successfully."""
  121. return self.get_status(task_id) == "DONE"
  122. def _get_task_meta_for(self, task_id):
  123. """Get task metadata for a task by id."""
  124. if task_id in self._cache:
  125. return self._cache[task_id]
  126. meta = self.get(self.get_cache_key_for_task(task_id))
  127. if not meta:
  128. return {"status": "PENDING", "result": None}
  129. meta = pickle.loads(str(meta))
  130. if meta.get("status") == "DONE":
  131. self._cache[task_id] = meta
  132. return meta