trace.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import sys
  2. import traceback
  3. from celery import states
  4. from celery import signals
  5. from celery.registry import tasks
  6. from celery.exceptions import RetryTaskError
  7. from celery.datastructures import ExceptionInfo
  8. class TraceInfo(object):
  9. def __init__(self, status=states.PENDING, retval=None, exc_info=None):
  10. self.status = status
  11. self.retval = retval
  12. self.exc_info = exc_info
  13. self.exc_type = None
  14. self.exc_value = None
  15. self.tb = None
  16. self.strtb = None
  17. if self.exc_info:
  18. self.exc_type, self.exc_value, self.tb = exc_info
  19. self.strtb = "\n".join(traceback.format_exception(*exc_info))
  20. @classmethod
  21. def trace(cls, fun, args, kwargs):
  22. """Trace the execution of a function, calling the appropiate callback
  23. if the function raises retry, an failure or returned successfully."""
  24. try:
  25. return cls(states.SUCCESS, retval=fun(*args, **kwargs))
  26. except (SystemExit, KeyboardInterrupt):
  27. raise
  28. except RetryTaskError, exc:
  29. return cls(states.RETRY, retval=exc, exc_info=sys.exc_info())
  30. except Exception, exc:
  31. return cls(states.FAILURE, retval=exc, exc_info=sys.exc_info())
  32. class TaskTrace(object):
  33. def __init__(self, task_name, task_id, args, kwargs, task=None):
  34. self.task_id = task_id
  35. self.task_name = task_name
  36. self.args = args
  37. self.kwargs = kwargs
  38. self.task = task or tasks[self.task_name]
  39. self.status = states.PENDING
  40. self.strtb = None
  41. self._trace_handlers = {states.FAILURE: self.handle_failure,
  42. states.RETRY: self.handle_retry,
  43. states.SUCCESS: self.handle_success}
  44. def __call__(self):
  45. return self.execute()
  46. def execute(self):
  47. signals.task_prerun.send(sender=self.task, task_id=self.task_id,
  48. task=self.task, args=self.args,
  49. kwargs=self.kwargs)
  50. retval = self._trace()
  51. signals.task_postrun.send(sender=self.task, task_id=self.task_id,
  52. task=self.task, args=self.args,
  53. kwargs=self.kwargs, retval=retval)
  54. return retval
  55. def _trace(self):
  56. trace = TraceInfo.trace(self.task, self.args, self.kwargs)
  57. self.status = trace.status
  58. self.strtb = trace.strtb
  59. handler = self._trace_handlers[trace.status]
  60. return handler(trace.retval, trace.exc_type, trace.tb, trace.strtb)
  61. def handle_success(self, retval, *args):
  62. """Handle successful execution."""
  63. self.task.on_success(retval, self.task_id, self.args, self.kwargs)
  64. return retval
  65. def handle_retry(self, exc, type_, tb, strtb):
  66. """Handle retry exception."""
  67. self.task.on_retry(exc, self.task_id, self.args, self.kwargs)
  68. # Create a simpler version of the RetryTaskError that stringifies
  69. # the original exception instead of including the exception instance.
  70. # This is for reporting the retry in logs, e-mail etc, while
  71. # guaranteeing pickleability.
  72. message, orig_exc = exc.args
  73. expanded_msg = "%s: %s" % (message, str(orig_exc))
  74. return ExceptionInfo((type_,
  75. type_(expanded_msg, None),
  76. tb))
  77. def handle_failure(self, exc, type_, tb, strtb):
  78. """Handle exception."""
  79. self.task.on_failure(exc, self.task_id, self.args, self.kwargs)
  80. return ExceptionInfo((type_, exc, tb))