trace.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import sys
  2. import traceback
  3. from celery import states
  4. from celery import signals
  5. from celery.registry import tasks
  6. from celery.exceptions import RetryTaskError
  7. from celery.datastructures import ExceptionInfo
  8. class TraceInfo(object):
  9. def __init__(self, status=states.PENDING, retval=None, exc_info=None):
  10. self.status = status
  11. self.retval = retval
  12. self.exc_info = exc_info
  13. self.exc_type = None
  14. self.exc_value = None
  15. self.tb = None
  16. self.strtb = None
  17. if self.exc_info:
  18. self.exc_type, self.exc_value, self.tb = exc_info
  19. self.strtb = "\n".join(traceback.format_exception(*exc_info))
  20. @classmethod
  21. def trace(cls, fun, args, kwargs):
  22. """Trace the execution of a function, calling the appropiate callback
  23. if the function raises retry, an failure or returned successfully."""
  24. try:
  25. return cls(states.SUCCESS, retval=fun(*args, **kwargs))
  26. except (SystemExit, KeyboardInterrupt):
  27. raise
  28. except RetryTaskError, exc:
  29. return cls(states.RETRY, retval=exc, exc_info=sys.exc_info())
  30. except Exception, exc:
  31. return cls(states.FAILURE, retval=exc, exc_info=sys.exc_info())
  32. except:
  33. # For Python2.4 where raising strings are still allowed.
  34. return cls(states.FAILURE, retval=None, exc_info=sys.exc_info())
  35. class TaskTrace(object):
  36. def __init__(self, task_name, task_id, args, kwargs, task=None, **_):
  37. self.task_id = task_id
  38. self.task_name = task_name
  39. self.args = args
  40. self.kwargs = kwargs
  41. self.task = task or tasks[self.task_name]
  42. self.status = states.PENDING
  43. self.strtb = None
  44. self._trace_handlers = {states.FAILURE: self.handle_failure,
  45. states.RETRY: self.handle_retry,
  46. states.SUCCESS: self.handle_success}
  47. def __call__(self):
  48. return self.execute()
  49. def execute(self):
  50. signals.task_prerun.send(sender=self.task, task_id=self.task_id,
  51. task=self.task, args=self.args,
  52. kwargs=self.kwargs)
  53. retval = self._trace()
  54. signals.task_postrun.send(sender=self.task, task_id=self.task_id,
  55. task=self.task, args=self.args,
  56. kwargs=self.kwargs, retval=retval)
  57. return retval
  58. def _trace(self):
  59. trace = TraceInfo.trace(self.task, self.args, self.kwargs)
  60. self.status = trace.status
  61. self.strtb = trace.strtb
  62. self.handle_after_return(trace.status, trace.retval,
  63. trace.exc_type, trace.tb, trace.strtb)
  64. handler = self._trace_handlers[trace.status]
  65. return handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
  66. def handle_after_return(self, status, retval, type_, tb, strtb):
  67. einfo = None
  68. if status in states.EXCEPTION_STATES:
  69. einfo = ExceptionInfo((retval, type_, tb))
  70. self.task.after_return(status, retval, self.task_id,
  71. self.args, self.kwargs, einfo=einfo)
  72. def handle_success(self, retval, *args):
  73. """Handle successful execution."""
  74. self.task.on_success(retval, self.task_id, self.args, self.kwargs)
  75. return retval
  76. def handle_retry(self, exc, type_, tb, strtb):
  77. """Handle retry exception."""
  78. # Create a simpler version of the RetryTaskError that stringifies
  79. # the original exception instead of including the exception instance.
  80. # This is for reporting the retry in logs, e-mail etc, while
  81. # guaranteeing pickleability.
  82. message, orig_exc = exc.args
  83. expanded_msg = "%s: %s" % (message, str(orig_exc))
  84. einfo = ExceptionInfo((type_,
  85. type_(expanded_msg, None),
  86. tb))
  87. self.task.on_retry(exc, self.task_id,
  88. self.args, self.kwargs, einfo=einfo)
  89. return einfo
  90. def handle_failure(self, exc, type_, tb, strtb):
  91. """Handle exception."""
  92. einfo = ExceptionInfo((type_, exc, tb))
  93. self.task.on_failure(exc, self.task_id,
  94. self.args, self.kwargs, einfo=einfo)
  95. return einfo