trace.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. from __future__ import absolute_import
  2. import sys
  3. import traceback
  4. from celery import states
  5. from celery import signals
  6. from celery.registry import tasks
  7. from celery.exceptions import RetryTaskError
  8. from celery.datastructures import ExceptionInfo
  9. class TraceInfo(object):
  10. def __init__(self, status=states.PENDING, retval=None, exc_info=None):
  11. self.status = status
  12. self.retval = retval
  13. self.exc_info = exc_info
  14. self.exc_type = None
  15. self.exc_value = None
  16. self.tb = None
  17. self.strtb = None
  18. if self.exc_info:
  19. self.exc_type, self.exc_value, self.tb = exc_info
  20. self.strtb = "\n".join(traceback.format_exception(*exc_info))
  21. @classmethod
  22. def trace(cls, fun, args, kwargs, propagate=False):
  23. """Trace the execution of a function, calling the appropiate callback
  24. if the function raises retry, an failure or returned successfully.
  25. :keyword propagate: If true, errors will propagate to the caller.
  26. """
  27. try:
  28. return cls(states.SUCCESS, retval=fun(*args, **kwargs))
  29. except RetryTaskError, exc:
  30. return cls(states.RETRY, retval=exc, exc_info=sys.exc_info())
  31. except Exception, exc:
  32. if propagate:
  33. raise
  34. return cls(states.FAILURE, retval=exc, exc_info=sys.exc_info())
  35. except BaseException, exc:
  36. raise
  37. except: # pragma: no cover
  38. # For Python2.5 where raising strings are still allowed
  39. # (but deprecated)
  40. if propagate:
  41. raise
  42. return cls(states.FAILURE, retval=None, exc_info=sys.exc_info())
  43. class TaskTrace(object):
  44. def __init__(self, task_name, task_id, args, kwargs, task=None,
  45. request=None, propagate=None, **_):
  46. self.task_id = task_id
  47. self.task_name = task_name
  48. self.args = args
  49. self.kwargs = kwargs
  50. self.task = task or tasks[self.task_name]
  51. self.request = request or {}
  52. self.status = states.PENDING
  53. self.strtb = None
  54. self.propagate = propagate
  55. self._trace_handlers = {states.FAILURE: self.handle_failure,
  56. states.RETRY: self.handle_retry,
  57. states.SUCCESS: self.handle_success}
  58. def __call__(self):
  59. return self.execute()
  60. def execute(self):
  61. self.task.request.update(self.request, args=self.args,
  62. called_directly=False, kwargs=self.kwargs)
  63. signals.task_prerun.send(sender=self.task, task_id=self.task_id,
  64. task=self.task, args=self.args,
  65. kwargs=self.kwargs)
  66. retval = self._trace()
  67. signals.task_postrun.send(sender=self.task, task_id=self.task_id,
  68. task=self.task, args=self.args,
  69. kwargs=self.kwargs, retval=retval)
  70. self.task.request.clear()
  71. return retval
  72. def _trace(self):
  73. trace = TraceInfo.trace(self.task, self.args, self.kwargs,
  74. propagate=self.propagate)
  75. self.status = trace.status
  76. self.strtb = trace.strtb
  77. handler = self._trace_handlers[trace.status]
  78. r = handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
  79. self.handle_after_return(trace.status, trace.retval,
  80. trace.exc_type, trace.tb, trace.strtb)
  81. return r
  82. def handle_after_return(self, status, retval, type_, tb, strtb,
  83. einfo=None):
  84. if status in states.EXCEPTION_STATES:
  85. einfo = ExceptionInfo((retval, type_, tb))
  86. self.task.after_return(status, retval, self.task_id,
  87. self.args, self.kwargs, einfo)
  88. def handle_success(self, retval, *args):
  89. """Handle successful execution."""
  90. self.task.on_success(retval, self.task_id, self.args, self.kwargs)
  91. return retval
  92. def handle_retry(self, exc, type_, tb, strtb):
  93. """Handle retry exception."""
  94. # Create a simpler version of the RetryTaskError that stringifies
  95. # the original exception instead of including the exception instance.
  96. # This is for reporting the retry in logs, email etc, while
  97. # guaranteeing pickleability.
  98. message, orig_exc = exc.args
  99. expanded_msg = "%s: %s" % (message, str(orig_exc))
  100. einfo = ExceptionInfo((type_, type_(expanded_msg, None), tb))
  101. self.task.on_retry(exc, self.task_id, self.args, self.kwargs, einfo)
  102. return einfo
  103. def handle_failure(self, exc, type_, tb, strtb):
  104. """Handle exception."""
  105. einfo = ExceptionInfo((type_, exc, tb))
  106. self.task.on_failure(exc, self.task_id, self.args, self.kwargs, einfo)
  107. signals.task_failure.send(sender=self.task, task_id=self.task_id,
  108. exception=exc, args=self.args,
  109. kwargs=self.kwargs, traceback=tb,
  110. einfo=einfo)
  111. return einfo