trace.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.task.trace
  4. ~~~~~~~~~~~~~~~~~~~~
  5. This module defines how the task execution is traced:
  6. errors are recorded, handlers are applied and so on.
  7. :copyright: (c) 2009 - 2012 by Ask Solem.
  8. :license: BSD, see LICENSE for more details.
  9. """
  10. from __future__ import absolute_import
  11. # ## ---
  12. # This is the heart of the worker, the inner loop so to speak.
  13. # It used to be split up into nice little classes and methods,
  14. # but in the end it only resulted in bad performance and horrible tracebacks,
  15. # so instead we now use one closure per task class.
  16. import os
  17. import socket
  18. import sys
  19. from warnings import warn
  20. from kombu.utils import kwdict
  21. from celery import current_app
  22. from celery import states, signals
  23. from celery.state import _task_stack
  24. from celery.app.task import BaseTask, Context
  25. from celery.datastructures import ExceptionInfo
  26. from celery.exceptions import RetryTaskError
  27. from celery.utils.serialization import get_pickleable_exception
  28. from celery.utils.log import get_logger
  29. _logger = get_logger(__name__)
  30. send_prerun = signals.task_prerun.send
  31. prerun_receivers = signals.task_prerun.receivers
  32. send_postrun = signals.task_postrun.send
  33. postrun_receivers = signals.task_postrun.receivers
  34. send_success = signals.task_success.send
  35. success_receivers = signals.task_success.receivers
  36. STARTED = states.STARTED
  37. SUCCESS = states.SUCCESS
  38. RETRY = states.RETRY
  39. FAILURE = states.FAILURE
  40. EXCEPTION_STATES = states.EXCEPTION_STATES
  41. def mro_lookup(cls, attr, stop=()):
  42. """Returns the first node by MRO order that defines an attribute.
  43. :keyword stop: A list of types that if reached will stop the search.
  44. :returns None: if the attribute was not found.
  45. """
  46. for node in cls.mro():
  47. if node in stop:
  48. return
  49. if attr in node.__dict__:
  50. return node
  51. def task_has_custom(task, attr):
  52. """Returns true if the task or one of its bases
  53. defines ``attr`` (excluding the one in BaseTask)."""
  54. return mro_lookup(task.__class__, attr, stop=(BaseTask, object))
  55. class TraceInfo(object):
  56. __slots__ = ("state", "retval")
  57. def __init__(self, state, retval=None):
  58. self.state = state
  59. self.retval = retval
  60. def handle_error_state(self, task, eager=False):
  61. store_errors = not eager
  62. if task.ignore_result:
  63. store_errors = task.store_errors_even_if_ignored
  64. return {
  65. RETRY: self.handle_retry,
  66. FAILURE: self.handle_failure,
  67. }[self.state](task, store_errors=store_errors)
  68. def handle_retry(self, task, store_errors=True):
  69. """Handle retry exception."""
  70. # Create a simpler version of the RetryTaskError that stringifies
  71. # the original exception instead of including the exception instance.
  72. # This is for reporting the retry in logs, email etc, while
  73. # guaranteeing pickleability.
  74. req = task.request
  75. type_, _, tb = sys.exc_info()
  76. try:
  77. exc = self.retval
  78. message, orig_exc = exc.args
  79. expanded_msg = "%s: %s" % (message, str(orig_exc))
  80. einfo = ExceptionInfo((type_, type_(expanded_msg, None), tb))
  81. if store_errors:
  82. task.backend.mark_as_retry(req.id, orig_exc, einfo.traceback)
  83. task.on_retry(exc, req.id, req.args, req.kwargs, einfo)
  84. return einfo
  85. finally:
  86. del(tb)
  87. def handle_failure(self, task, store_errors=True):
  88. """Handle exception."""
  89. req = task.request
  90. _, type_, tb = sys.exc_info()
  91. try:
  92. exc = self.retval
  93. einfo = ExceptionInfo((type_, get_pickleable_exception(exc), tb))
  94. if store_errors:
  95. task.backend.mark_as_failure(req.id, exc, einfo.traceback)
  96. task.on_failure(exc, req.id, req.args, req.kwargs, einfo)
  97. signals.task_failure.send(sender=task, task_id=req.id,
  98. exception=exc, args=req.args,
  99. kwargs=req.kwargs,
  100. traceback=einfo.traceback,
  101. einfo=einfo)
  102. return einfo
  103. finally:
  104. del(tb)
  105. def execute_bare(task, uuid, args, kwargs, request=None, Info=TraceInfo):
  106. R = I = None
  107. kwargs = kwdict(kwargs)
  108. try:
  109. try:
  110. R = retval = task(*args, **kwargs)
  111. state = SUCCESS
  112. except Exception, exc:
  113. I = Info(FAILURE, exc)
  114. state, retval = I.state, I.retval
  115. R = I.handle_error_state(task)
  116. except BaseException, exc:
  117. raise
  118. except: # pragma: no cover
  119. # For Python2.5 where raising strings are still allowed
  120. # (but deprecated)
  121. I = Info(FAILURE, None)
  122. state, retval = I.state, I.retval
  123. R = I.handle_error_state(task)
  124. except Exception, exc:
  125. R = report_internal_error(task, exc)
  126. return R
  127. def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
  128. Info=TraceInfo, eager=False, propagate=False):
  129. # If the task doesn't define a custom __call__ method
  130. # we optimize it away by simply calling the run method directly,
  131. # saving the extra method call and a line less in the stack trace.
  132. fun = task if task_has_custom(task, "__call__") else task.run
  133. loader = loader or current_app.loader
  134. backend = task.backend
  135. ignore_result = task.ignore_result
  136. track_started = task.track_started
  137. track_started = not eager and (task.track_started and not ignore_result)
  138. publish_result = not eager and not ignore_result
  139. hostname = hostname or socket.gethostname()
  140. loader_task_init = loader.on_task_init
  141. loader_cleanup = loader.on_process_cleanup
  142. task_on_success = None
  143. task_after_return = None
  144. if task_has_custom(task, "on_success"):
  145. task_on_success = task.on_success
  146. if task_has_custom(task, "after_return"):
  147. task_after_return = task.after_return
  148. store_result = backend.store_result
  149. backend_cleanup = backend.process_cleanup
  150. pid = os.getpid()
  151. request_stack = task.request_stack
  152. push_request = request_stack.push
  153. pop_request = request_stack.pop
  154. on_chord_part_return = backend.on_chord_part_return
  155. from celery import canvas
  156. subtask = canvas.subtask
  157. def trace_task(uuid, args, kwargs, request=None):
  158. R = I = None
  159. kwargs = kwdict(kwargs)
  160. try:
  161. _task_stack.push(task)
  162. task_request = Context(request or {}, args=args,
  163. called_directly=False, kwargs=kwargs)
  164. push_request(task_request)
  165. try:
  166. # -*- PRE -*-
  167. if prerun_receivers:
  168. send_prerun(sender=task, task_id=uuid, task=task,
  169. args=args, kwargs=kwargs)
  170. loader_task_init(uuid, task)
  171. if track_started:
  172. store_result(uuid, {"pid": pid,
  173. "hostname": hostname}, STARTED)
  174. # -*- TRACE -*-
  175. try:
  176. R = retval = fun(*args, **kwargs)
  177. state = SUCCESS
  178. except RetryTaskError, exc:
  179. I = Info(RETRY, exc)
  180. state, retval = I.state, I.retval
  181. R = I.handle_error_state(task, eager=eager)
  182. except Exception, exc:
  183. if propagate:
  184. raise
  185. I = Info(FAILURE, exc)
  186. state, retval = I.state, I.retval
  187. R = I.handle_error_state(task, eager=eager)
  188. [subtask(errback).apply_async((uuid, ))
  189. for errback in task_request.errbacks or []]
  190. except BaseException, exc:
  191. raise
  192. except: # pragma: no cover
  193. # For Python2.5 where raising strings are still allowed
  194. # (but deprecated)
  195. if propagate:
  196. raise
  197. I = Info(FAILURE, None)
  198. state, retval = I.state, I.retval
  199. R = I.handle_error_state(task, eager=eager)
  200. [subtask(errback).apply_async((uuid, ))
  201. for errback in task_request.errbacks or []]
  202. else:
  203. # callback tasks must be applied before the result is
  204. # stored, so that result.children is populated.
  205. [subtask(callback).apply_async((retval, ))
  206. for callback in task_request.callbacks or []]
  207. if publish_result:
  208. store_result(uuid, retval, SUCCESS)
  209. if task_on_success:
  210. task_on_success(retval, uuid, args, kwargs)
  211. if success_receivers:
  212. send_success(sender=task, result=retval)
  213. # -* POST *-
  214. if task_request.chord:
  215. on_chord_part_return(task)
  216. if task_after_return:
  217. task_after_return(state, retval, uuid, args, kwargs, None)
  218. if postrun_receivers:
  219. send_postrun(sender=task, task_id=uuid, task=task,
  220. args=args, kwargs=kwargs,
  221. retval=retval, state=state)
  222. finally:
  223. _task_stack.pop()
  224. pop_request()
  225. if not eager:
  226. try:
  227. backend_cleanup()
  228. loader_cleanup()
  229. except (KeyboardInterrupt, SystemExit, MemoryError):
  230. raise
  231. except Exception, exc:
  232. _logger.error("Process cleanup failed: %r", exc,
  233. exc_info=True)
  234. except Exception, exc:
  235. if eager:
  236. raise
  237. R = report_internal_error(task, exc)
  238. return R, I
  239. return trace_task
  240. def trace_task(task, uuid, args, kwargs, request=None, **opts):
  241. try:
  242. if task.__tracer__ is None:
  243. task.__tracer__ = build_tracer(task.name, task, **opts)
  244. return task.__tracer__(uuid, args, kwargs, request)
  245. except Exception, exc:
  246. return report_internal_error(task, exc), None
  247. def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
  248. opts.setdefault("eager", True)
  249. return build_tracer(task.name, task, **opts)(
  250. uuid, args, kwargs, request)
  251. def report_internal_error(task, exc):
  252. _type, _value, _tb = sys.exc_info()
  253. try:
  254. _value = task.backend.prepare_exception(exc)
  255. exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
  256. warn(RuntimeWarning(
  257. "Exception raised outside body: %r:\n%s" % (
  258. exc, exc_info.traceback)))
  259. return exc_info
  260. finally:
  261. del(_tb)