base.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. """celery.backends.base"""
  2. from celery.timer import TimeoutTimer
  3. try:
  4. import cPickle as pickle
  5. except ImportError:
  6. import pickle
  7. import sys
  8. def find_nearest_pickleable_exception(exc):
  9. """With an exception instance, iterate over its super classes (by mro)
  10. and find the first super exception that is pickleable. It does
  11. not go below :exc:`Exception` (i.e. it skips :exc:`Exception`,
  12. :class:`BaseException` and :class:`object`). If that happens
  13. you should use :exc:`UnpickleableException` instead.
  14. :param exc: An exception instance.
  15. :returns: the nearest exception if it's not :exc:`Exception` or below,
  16. if it is it returns ``None``.
  17. :rtype: :exc:`Exception`
  18. """
  19. for supercls in exc.__class__.mro():
  20. if supercls is Exception:
  21. # only BaseException and object, from here on down,
  22. # we don't care about these.
  23. return None
  24. try:
  25. superexc = supercls(*exc.args)
  26. pickle.dumps(superexc)
  27. except:
  28. pass
  29. else:
  30. return superexc
  31. return None
  32. class UnpickleableExceptionWrapper(Exception):
  33. """Wraps unpickleable exceptions.
  34. :param exc_module: see :attr:`exc_module`.
  35. :param exc_cls_name: see :attr:`exc_cls_name`.
  36. :param exc_args: see :attr:`exc_args`
  37. .. attribute:: exc_module
  38. The module of the original exception.
  39. .. attribute:: exc_cls_name
  40. The name of the original exception class.
  41. .. attribute:: exc_args
  42. The arguments for the original exception.
  43. Example
  44. >>> try:
  45. ... something_raising_unpickleable_exc()
  46. >>> except Exception, e:
  47. ... exc = UnpickleableException(e.__class__.__module__,
  48. ... e.__class__.__name__,
  49. ... e.args)
  50. ... pickle.dumps(exc) # Works fine.
  51. """
  52. def __init__(self, exc_module, exc_cls_name, exc_args):
  53. self.exc_module = exc_module
  54. self.exc_cls_name = exc_cls_name
  55. self.exc_args = exc_args
  56. super(Exception, self).__init__(exc_module, exc_cls_name, exc_args)
  57. class BaseBackend(object):
  58. """The base backend class. All backends should inherit from this."""
  59. UnpickleableExecptionWrapper = UnpickleableExceptionWrapper
  60. def store_result(self, task_id, result, status):
  61. """Store the result and status of a task."""
  62. raise NotImplementedError(
  63. "Backends must implement the store_result method")
  64. def mark_as_done(self, task_id, result):
  65. """Mark task as successfully executed."""
  66. return self.store_result(task_id, result, status="DONE")
  67. def mark_as_failure(self, task_id, exc):
  68. """Mark task as executed with failure. Stores the execption."""
  69. return self.store_result(task_id, exc, status="FAILURE")
  70. def create_exception_cls(self, name, module, parent=None):
  71. if not parent:
  72. parent = Exception
  73. return type(name, (parent, ), {"__module__": module})
  74. def prepare_exception(self, exc):
  75. nearest = find_nearest_pickleable_exception(exc)
  76. if nearest:
  77. return nearest
  78. try:
  79. pickle.dumps(exc)
  80. except pickle.PickleError:
  81. excwrapper = UnpickleableExceptionWrapper(
  82. exc.__class__.__module__,
  83. exc.__class__.__name__,
  84. exc.args)
  85. return excwrapper
  86. else:
  87. return exc
  88. def exception_to_python(self, exc):
  89. if isinstance(exc, UnpickleableExceptionWrapper):
  90. exc_cls = self.create_exception_cls(exc.exc_cls_name,
  91. exc.exc_module)
  92. return exc_cls(*exc.exc_args)
  93. return exc
  94. def mark_as_retry(self, task_id, exc):
  95. """Mark task for retry."""
  96. return self.store_result(task_id, exc, status="RETRY")
  97. def get_status(self, task_id):
  98. """Get the status of a task."""
  99. raise NotImplementedError(
  100. "Backends must implement the get_status method")
  101. def prepare_result(self, result):
  102. """Prepare result for storage."""
  103. if result is None:
  104. return True
  105. return result
  106. def get_result(self, task_id):
  107. """Get the result of a task."""
  108. raise NotImplementedError(
  109. "Backends must implement the get_result method")
  110. def is_done(self, task_id):
  111. """Returns ``True`` if the task was successfully executed."""
  112. return self.get_status(task_id) == "DONE"
  113. def cleanup(self):
  114. """Backend cleanup. Is run by
  115. :class:`celery.task.DeleteExpiredTaskMetaTask`."""
  116. pass
  117. def wait_for(self, task_id, timeout=None):
  118. """Wait for task and return its result.
  119. If the task raises an exception, this exception
  120. will be re-raised by :func:`wait_for`.
  121. If ``timeout`` is not ``None``, this raises the
  122. :class:`celery.timer.TimeoutError` exception if the operation takes
  123. longer than ``timeout`` seconds.
  124. """
  125. timeout_timer = TimeoutTimer(timeout)
  126. while True:
  127. status = self.get_status(task_id)
  128. if status == "DONE":
  129. return self.get_result(task_id)
  130. elif status == "FAILURE":
  131. raise self.get_result(task_id)
  132. timeout_timer.tick()