trace.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.execute.trace
  4. ~~~~~~~~~~~~~~~~~~~~
  5. This module defines how the task execution is traced:
  6. errors are recorded, handlers are applied and so on.
  7. :copyright: (c) 2009 - 2012 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. # ## ---
  12. # This is the heart of the worker, the inner loop so to speak.
  13. # It used to be split up into nice little classes and methods,
  14. # but in the end it only resulted in bad performance and horrible tracebacks,
  15. # so instead we now use one closure per task class.
  16. import os
  17. import socket
  18. import sys
  19. import traceback
  20. from warnings import warn
  21. from .. import app as app_module
  22. from .. import current_app
  23. from .. import states, signals
  24. from ..datastructures import ExceptionInfo
  25. from ..exceptions import RetryTaskError
  26. from ..registry import tasks
  27. from ..utils.serialization import get_pickleable_exception
  28. send_prerun = signals.task_prerun.send
  29. prerun_receivers = signals.task_prerun.receivers
  30. send_postrun = signals.task_postrun.send
  31. postrun_receivers = signals.task_postrun.receivers
  32. STARTED = states.STARTED
  33. SUCCESS = states.SUCCESS
  34. RETRY = states.RETRY
  35. FAILURE = states.FAILURE
  36. EXCEPTION_STATES = states.EXCEPTION_STATES
  37. class TraceInfo(object):
  38. __slots__ = ("state", "retval", "exc_info",
  39. "exc_type", "exc_value", "tb", "strtb")
  40. def __init__(self, state, retval=None, exc_info=None):
  41. self.state = state
  42. self.retval = retval
  43. self.exc_info = exc_info
  44. if exc_info:
  45. self.exc_type, self.exc_value, self.tb = exc_info
  46. else:
  47. self.exc_type = self.exc_value = self.tb = None
  48. def handle_error_state(self, task, eager=False):
  49. store_errors = not eager
  50. if task.ignore_result:
  51. store_errors = task.store_errors_even_if_ignored
  52. return {
  53. RETRY: self.handle_retry,
  54. FAILURE: self.handle_failure,
  55. }[self.state](task, store_errors=store_errors)
  56. def handle_retry(self, task, store_errors=True):
  57. """Handle retry exception."""
  58. # Create a simpler version of the RetryTaskError that stringifies
  59. # the original exception instead of including the exception instance.
  60. # This is for reporting the retry in logs, email etc, while
  61. # guaranteeing pickleability.
  62. req = task.request
  63. exc, type_, tb = self.retval, self.exc_type, self.tb
  64. message, orig_exc = self.retval.args
  65. if store_errors:
  66. task.backend.mark_as_retry(req.id, orig_exc, self.strtb)
  67. expanded_msg = "%s: %s" % (message, str(orig_exc))
  68. einfo = ExceptionInfo((type_, type_(expanded_msg, None), tb))
  69. task.on_retry(exc, req.id, req.args, req.kwargs, einfo)
  70. return einfo
  71. def handle_failure(self, task, store_errors=True):
  72. """Handle exception."""
  73. req = task.request
  74. exc, type_, tb = self.retval, self.exc_type, self.tb
  75. if store_errors:
  76. task.backend.mark_as_failure(req.id, exc, self.strtb)
  77. exc = get_pickleable_exception(exc)
  78. einfo = ExceptionInfo((type_, exc, tb))
  79. task.on_failure(exc, req.id, req.args, req.kwargs, einfo)
  80. signals.task_failure.send(sender=task, task_id=req.id,
  81. exception=exc, args=req.args,
  82. kwargs=req.kwargs, traceback=tb,
  83. einfo=einfo)
  84. return einfo
  85. @property
  86. def strtb(self):
  87. if self.exc_info:
  88. return '\n'.join(traceback.format_exception(*self.exc_info))
  89. return ''
  90. def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
  91. Info=TraceInfo, eager=False, propagate=False):
  92. task = task or tasks[name]
  93. loader = loader or current_app.loader
  94. backend = task.backend
  95. ignore_result = task.ignore_result
  96. track_started = task.track_started
  97. track_started = not eager and (task.track_started and not ignore_result)
  98. publish_result = not eager and not ignore_result
  99. hostname = hostname or socket.gethostname()
  100. loader_task_init = loader.on_task_init
  101. loader_cleanup = loader.on_process_cleanup
  102. task_on_success = task.on_success
  103. task_after_return = task.after_return
  104. task_request = task.request
  105. _tls = app_module._tls
  106. store_result = backend.store_result
  107. backend_cleanup = backend.process_cleanup
  108. pid = os.getpid()
  109. update_request = task_request.update
  110. clear_request = task_request.clear
  111. on_chord_part_return = backend.on_chord_part_return
  112. def trace_task(uuid, args, kwargs, request=None):
  113. R = I = None
  114. try:
  115. _tls.current_task = task
  116. update_request(request or {}, args=args,
  117. called_directly=False, kwargs=kwargs)
  118. try:
  119. # -*- PRE -*-
  120. send_prerun(sender=task, task_id=uuid, task=task,
  121. args=args, kwargs=kwargs)
  122. loader_task_init(uuid, task)
  123. if track_started:
  124. store_result(uuid, {"pid": pid,
  125. "hostname": hostname}, STARTED)
  126. # -*- TRACE -*-
  127. try:
  128. R = retval = task(*args, **kwargs)
  129. state, einfo = SUCCESS, None
  130. except RetryTaskError, exc:
  131. I = Info(RETRY, exc, sys.exc_info())
  132. state, retval, einfo = I.state, I.retval, I.exc_info
  133. R = I.handle_error_state(task, eager=eager)
  134. except Exception, exc:
  135. if propagate:
  136. raise
  137. I = Info(FAILURE, exc, sys.exc_info())
  138. state, retval, einfo = I.state, I.retval, I.exc_info
  139. R = I.handle_error_state(task, eager=eager)
  140. except BaseException, exc:
  141. raise
  142. except: # pragma: no cover
  143. # For Python2.5 where raising strings are still allowed
  144. # (but deprecated)
  145. if propagate:
  146. raise
  147. I = Info(FAILURE, None, sys.exc_info())
  148. state, retval, einfo = I.state, I.retval, I.exc_info
  149. R = I.handle_error_state(task, eager=eager)
  150. else:
  151. task_on_success(retval, uuid, args, kwargs)
  152. if publish_result:
  153. store_result(uuid, retval, SUCCESS)
  154. # -* POST *-
  155. if task_request.chord:
  156. on_chord_part_return(task)
  157. task_after_return(state, retval, uuid, args, kwargs, einfo)
  158. send_postrun(sender=task, task_id=uuid, task=task,
  159. args=args, kwargs=kwargs, retval=retval)
  160. finally:
  161. _tls.current_task = None
  162. clear_request()
  163. if not eager:
  164. try:
  165. backend_cleanup()
  166. loader_cleanup()
  167. except (KeyboardInterrupt, SystemExit, MemoryError):
  168. raise
  169. except Exception, exc:
  170. logger = current_app.log.get_default_logger()
  171. logger.error("Process cleanup failed: %r", exc,
  172. exc_info=True)
  173. except Exception, exc:
  174. if eager:
  175. raise
  176. R = report_internal_error(task, exc)
  177. return R, I
  178. return trace_task
  179. def trace_task(task, uuid, args, kwargs, request=None, **opts):
  180. try:
  181. if task.__tracer__ is None:
  182. task.__tracer__ = build_tracer(task.name, task, **opts)
  183. return task.__tracer__(uuid, args, kwargs, request)
  184. except Exception, exc:
  185. return report_internal_error(task, exc), None
  186. def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
  187. opts.setdefault("eager", True)
  188. return build_tracer(task.name, task, **opts)(
  189. uuid, args, kwargs, request)
  190. def report_internal_error(task, exc):
  191. _type, _value, _tb = sys.exc_info()
  192. try:
  193. _value = task.backend.prepare_exception(exc)
  194. exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
  195. warn(RuntimeWarning(
  196. "Exception raised outside body: %r:\n%s" % (
  197. exc, exc_info.traceback)))
  198. return exc_info
  199. finally:
  200. del(_tb)