trace.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.trace
  4. ~~~~~~~~~~~~~~~~~~~~
  5. This module defines how the task execution is traced:
  6. errors are recorded, handlers are applied and so on.
  7. :copyright: (c) 2009 - 2012 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. # ## ---
  12. # This is the heart of the worker, the inner loop so to speak.
  13. # It used to be split up into nice little classes and methods,
  14. # but in the end it only resulted in bad performance and horrible tracebacks,
  15. # so instead we now use one closure per task class.
  16. import os
  17. import socket
  18. import sys
  19. from warnings import warn
  20. from kombu.utils import kwdict
  21. from celery import current_app
  22. from celery import states, signals
  23. from celery.state import _task_stack
  24. from celery.app.task import BaseTask, Context
  25. from celery.datastructures import ExceptionInfo
  26. from celery.exceptions import RetryTaskError
  27. from celery.utils.serialization import get_pickleable_exception
  28. from celery.utils.log import get_logger
  29. _logger = get_logger(__name__)
  30. send_prerun = signals.task_prerun.send
  31. prerun_receivers = signals.task_prerun.receivers
  32. send_postrun = signals.task_postrun.send
  33. postrun_receivers = signals.task_postrun.receivers
  34. STARTED = states.STARTED
  35. SUCCESS = states.SUCCESS
  36. RETRY = states.RETRY
  37. FAILURE = states.FAILURE
  38. EXCEPTION_STATES = states.EXCEPTION_STATES
  39. def mro_lookup(cls, attr, stop=()):
  40. """Returns the first node by MRO order that defines an attribute.
  41. :keyword stop: A list of types that if reached will stop the search.
  42. :returns None: if the attribute was not found.
  43. """
  44. for node in cls.mro():
  45. if node in stop:
  46. return
  47. if attr in node.__dict__:
  48. return node
  49. def defines_custom_call(task):
  50. """Returns true if the task or one of its bases
  51. defines __call__ (excluding the one in BaseTask)."""
  52. return mro_lookup(task.__class__, "__call__", stop=(BaseTask, object))
  53. class TraceInfo(object):
  54. __slots__ = ("state", "retval")
  55. def __init__(self, state, retval=None):
  56. self.state = state
  57. self.retval = retval
  58. def handle_error_state(self, task, eager=False):
  59. store_errors = not eager
  60. if task.ignore_result:
  61. store_errors = task.store_errors_even_if_ignored
  62. return {
  63. RETRY: self.handle_retry,
  64. FAILURE: self.handle_failure,
  65. }[self.state](task, store_errors=store_errors)
  66. def handle_retry(self, task, store_errors=True):
  67. """Handle retry exception."""
  68. # Create a simpler version of the RetryTaskError that stringifies
  69. # the original exception instead of including the exception instance.
  70. # This is for reporting the retry in logs, email etc, while
  71. # guaranteeing pickleability.
  72. req = task.request
  73. type_, _, tb = sys.exc_info()
  74. try:
  75. exc = self.retval
  76. message, orig_exc = exc.args
  77. expanded_msg = "%s: %s" % (message, str(orig_exc))
  78. einfo = ExceptionInfo((type_, type_(expanded_msg, None), tb))
  79. if store_errors:
  80. task.backend.mark_as_retry(req.id, orig_exc, einfo.traceback)
  81. task.on_retry(exc, req.id, req.args, req.kwargs, einfo)
  82. return einfo
  83. finally:
  84. del(tb)
  85. def handle_failure(self, task, store_errors=True):
  86. """Handle exception."""
  87. req = task.request
  88. _, type_, tb = sys.exc_info()
  89. try:
  90. exc = self.retval
  91. einfo = ExceptionInfo((type_, get_pickleable_exception(exc), tb))
  92. if store_errors:
  93. task.backend.mark_as_failure(req.id, exc, einfo.traceback)
  94. task.on_failure(exc, req.id, req.args, req.kwargs, einfo)
  95. signals.task_failure.send(sender=task, task_id=req.id,
  96. exception=exc, args=req.args,
  97. kwargs=req.kwargs,
  98. traceback=einfo.traceback,
  99. einfo=einfo)
  100. return einfo
  101. finally:
  102. del(tb)
  103. def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
  104. Info=TraceInfo, eager=False, propagate=False):
  105. # If the task doesn't define a custom __call__ method
  106. # we optimize it away by simply calling the run method directly,
  107. # saving the extra method call and a line less in the stack trace.
  108. fun = task if defines_custom_call(task) else task.run
  109. loader = loader or current_app.loader
  110. backend = task.backend
  111. ignore_result = task.ignore_result
  112. track_started = task.track_started
  113. track_started = not eager and (task.track_started and not ignore_result)
  114. publish_result = not eager and not ignore_result
  115. hostname = hostname or socket.gethostname()
  116. loader_task_init = loader.on_task_init
  117. loader_cleanup = loader.on_process_cleanup
  118. task_on_success = task.on_success
  119. task_after_return = task.after_return
  120. store_result = backend.store_result
  121. backend_cleanup = backend.process_cleanup
  122. pid = os.getpid()
  123. request_stack = task.request_stack
  124. push_request = request_stack.push
  125. pop_request = request_stack.pop
  126. on_chord_part_return = backend.on_chord_part_return
  127. from celery import canvas
  128. subtask = canvas.subtask
  129. def trace_task(uuid, args, kwargs, request=None):
  130. R = I = None
  131. kwargs = kwdict(kwargs)
  132. try:
  133. _task_stack.push(task)
  134. task_request = Context(request or {}, args=args,
  135. called_directly=False, kwargs=kwargs)
  136. push_request(task_request)
  137. try:
  138. # -*- PRE -*-
  139. send_prerun(sender=task, task_id=uuid, task=task,
  140. args=args, kwargs=kwargs)
  141. loader_task_init(uuid, task)
  142. if track_started:
  143. store_result(uuid, {"pid": pid,
  144. "hostname": hostname}, STARTED)
  145. # -*- TRACE -*-
  146. try:
  147. R = retval = fun(*args, **kwargs)
  148. state = SUCCESS
  149. except RetryTaskError, exc:
  150. I = Info(RETRY, exc)
  151. state, retval = I.state, I.retval
  152. R = I.handle_error_state(task, eager=eager)
  153. except Exception, exc:
  154. if propagate:
  155. raise
  156. I = Info(FAILURE, exc)
  157. state, retval = I.state, I.retval
  158. R = I.handle_error_state(task, eager=eager)
  159. [subtask(errback).apply_async((uuid, ))
  160. for errback in task_request.errbacks or []]
  161. except BaseException, exc:
  162. raise
  163. except: # pragma: no cover
  164. # For Python2.5 where raising strings are still allowed
  165. # (but deprecated)
  166. if propagate:
  167. raise
  168. I = Info(FAILURE, None)
  169. state, retval = I.state, I.retval
  170. R = I.handle_error_state(task, eager=eager)
  171. [subtask(errback).apply_async((uuid, ))
  172. for errback in task_request.errbacks or []]
  173. else:
  174. if publish_result:
  175. store_result(uuid, retval, SUCCESS)
  176. # callback tasks must be applied before the result is
  177. # stored, so that result.children is populated.
  178. [subtask(callback).apply_async((retval, ))
  179. for callback in task_request.callbacks or []]
  180. task_on_success(retval, uuid, args, kwargs)
  181. # -* POST *-
  182. if task_request.chord:
  183. on_chord_part_return(task)
  184. task_after_return(state, retval, uuid, args, kwargs, None)
  185. send_postrun(sender=task, task_id=uuid, task=task,
  186. args=args, kwargs=kwargs, retval=retval)
  187. finally:
  188. _task_stack.pop()
  189. pop_request()
  190. if not eager:
  191. try:
  192. backend_cleanup()
  193. loader_cleanup()
  194. except (KeyboardInterrupt, SystemExit, MemoryError):
  195. raise
  196. except Exception, exc:
  197. _logger.error("Process cleanup failed: %r", exc,
  198. exc_info=True)
  199. except Exception, exc:
  200. if eager:
  201. raise
  202. R = report_internal_error(task, exc)
  203. return R, I
  204. return trace_task
  205. def trace_task(task, uuid, args, kwargs, request=None, **opts):
  206. try:
  207. if task.__tracer__ is None:
  208. task.__tracer__ = build_tracer(task.name, task, **opts)
  209. return task.__tracer__(uuid, args, kwargs, request)
  210. except Exception, exc:
  211. return report_internal_error(task, exc), None
  212. def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
  213. opts.setdefault("eager", True)
  214. return build_tracer(task.name, task, **opts)(
  215. uuid, args, kwargs, request)
  216. def report_internal_error(task, exc):
  217. _type, _value, _tb = sys.exc_info()
  218. try:
  219. _value = task.backend.prepare_exception(exc)
  220. exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
  221. warn(RuntimeWarning(
  222. "Exception raised outside body: %r:\n%s" % (
  223. exc, exc_info.traceback)))
  224. return exc_info
  225. finally:
  226. del(_tb)