123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384 |
- # -*- coding: utf-8 -*-
- """
- celery.task.trace
- ~~~~~~~~~~~~~~~~~~~~
- This module defines how the task execution is traced:
- errors are recorded, handlers are applied and so on.
- """
- from __future__ import absolute_import
- # ## ---
- # This is the heart of the worker, the inner loop so to speak.
- # It used to be split up into nice little classes and methods,
- # but in the end it only resulted in bad performance and horrible tracebacks,
- # so instead we now use one closure per task class.
- import os
- import socket
- import sys
- from warnings import warn
- from kombu.utils import kwdict
- from celery import current_app
- from celery import states, signals
- from celery._state import _task_stack
- from celery.app import set_default_app
- from celery.app.task import Task as BaseTask, Context
- from celery.datastructures import ExceptionInfo
- from celery.exceptions import Ignore, RetryTaskError
- from celery.utils.log import get_logger
- from celery.utils.objects import mro_lookup
- from celery.utils.serialization import get_pickleable_exception
- _logger = get_logger(__name__)
- send_prerun = signals.task_prerun.send
- send_postrun = signals.task_postrun.send
- send_success = signals.task_success.send
- STARTED = states.STARTED
- SUCCESS = states.SUCCESS
- IGNORED = states.IGNORED
- RETRY = states.RETRY
- FAILURE = states.FAILURE
- EXCEPTION_STATES = states.EXCEPTION_STATES
- IGNORE_STATES = frozenset([IGNORED, RETRY])
- #: set by :func:`setup_worker_optimizations`
- _tasks = None
- _patched = {}
- def task_has_custom(task, attr):
- """Returns true if the task or one of its bases
- defines ``attr`` (excluding the one in BaseTask)."""
- return mro_lookup(task.__class__, attr, stop=(BaseTask, object),
- monkey_patched=['celery.app.task'])
- class TraceInfo(object):
- __slots__ = ('state', 'retval')
- def __init__(self, state, retval=None):
- self.state = state
- self.retval = retval
- def handle_error_state(self, task, eager=False):
- store_errors = not eager
- if task.ignore_result:
- store_errors = task.store_errors_even_if_ignored
- return {
- RETRY: self.handle_retry,
- FAILURE: self.handle_failure,
- }[self.state](task, store_errors=store_errors)
- def handle_retry(self, task, store_errors=True):
- """Handle retry exception."""
- # the exception raised is the RetryTaskError semi-predicate,
- # and it's exc' attribute is the original exception raised (if any).
- req = task.request
- type_, _, tb = sys.exc_info()
- try:
- reason = self.retval
- einfo = ExceptionInfo((type_, reason, tb))
- if store_errors:
- task.backend.mark_as_retry(req.id, reason.exc, einfo.traceback)
- task.on_retry(reason.exc, req.id, req.args, req.kwargs, einfo)
- signals.task_retry.send(sender=task, request=req,
- reason=reason, einfo=einfo)
- return einfo
- finally:
- del(tb)
- def handle_failure(self, task, store_errors=True):
- """Handle exception."""
- req = task.request
- type_, _, tb = sys.exc_info()
- try:
- exc = self.retval
- einfo = ExceptionInfo((type_, get_pickleable_exception(exc), tb))
- if store_errors:
- task.backend.mark_as_failure(req.id, exc, einfo.traceback)
- task.on_failure(exc, req.id, req.args, req.kwargs, einfo)
- signals.task_failure.send(sender=task, task_id=req.id,
- exception=exc, args=req.args,
- kwargs=req.kwargs,
- traceback=tb,
- einfo=einfo)
- return einfo
- finally:
- del(tb)
- def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
- Info=TraceInfo, eager=False, propagate=False,
- IGNORE_STATES=IGNORE_STATES):
- """Builts a function that tracing the tasks execution; catches all
- exceptions, and saves the state and result of the task execution
- to the result backend.
- If the call was successful, it saves the result to the task result
- backend, and sets the task status to `"SUCCESS"`.
- If the call raises :exc:`~celery.exceptions.RetryTaskError`, it extracts
- the original exception, uses that as the result and sets the task status
- to `"RETRY"`.
- If the call results in an exception, it saves the exception as the task
- result, and sets the task status to `"FAILURE"`.
- Returns a function that takes the following arguments:
- :param uuid: The unique id of the task.
- :param args: List of positional args to pass on to the function.
- :param kwargs: Keyword arguments mapping to pass on to the function.
- :keyword request: Request dict.
- """
- # If the task doesn't define a custom __call__ method
- # we optimize it away by simply calling the run method directly,
- # saving the extra method call and a line less in the stack trace.
- fun = task if task_has_custom(task, '__call__') else task.run
- loader = loader or current_app.loader
- backend = task.backend
- ignore_result = task.ignore_result
- track_started = task.track_started
- track_started = not eager and (task.track_started and not ignore_result)
- publish_result = not eager and not ignore_result
- hostname = hostname or socket.gethostname()
- loader_task_init = loader.on_task_init
- loader_cleanup = loader.on_process_cleanup
- task_on_success = None
- task_after_return = None
- if task_has_custom(task, 'on_success'):
- task_on_success = task.on_success
- if task_has_custom(task, 'after_return'):
- task_after_return = task.after_return
- store_result = backend.store_result
- backend_cleanup = backend.process_cleanup
- pid = os.getpid()
- request_stack = task.request_stack
- push_request = request_stack.push
- pop_request = request_stack.pop
- push_task = _task_stack.push
- pop_task = _task_stack.pop
- on_chord_part_return = backend.on_chord_part_return
- prerun_receivers = signals.task_prerun.receivers
- postrun_receivers = signals.task_postrun.receivers
- success_receivers = signals.task_success.receivers
- from celery import canvas
- subtask = canvas.subtask
- def trace_task(uuid, args, kwargs, request=None):
- R = I = None
- kwargs = kwdict(kwargs)
- try:
- push_task(task)
- task_request = Context(request or {}, args=args,
- called_directly=False, kwargs=kwargs)
- push_request(task_request)
- try:
- # -*- PRE -*-
- if prerun_receivers:
- send_prerun(sender=task, task_id=uuid, task=task,
- args=args, kwargs=kwargs)
- loader_task_init(uuid, task)
- if track_started:
- store_result(uuid, {'pid': pid,
- 'hostname': hostname}, STARTED)
- # -*- TRACE -*-
- try:
- R = retval = fun(*args, **kwargs)
- state = SUCCESS
- except Ignore as exc:
- I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
- state, retval = I.state, I.retval
- except RetryTaskError as exc:
- I = Info(RETRY, exc)
- state, retval = I.state, I.retval
- R = I.handle_error_state(task, eager=eager)
- except Exception as exc:
- if propagate:
- raise
- I = Info(FAILURE, exc)
- state, retval = I.state, I.retval
- R = I.handle_error_state(task, eager=eager)
- [subtask(errback).apply_async((uuid, ))
- for errback in task_request.errbacks or []]
- except BaseException as exc:
- raise
- else:
- # callback tasks must be applied before the result is
- # stored, so that result.children is populated.
- [subtask(callback).apply_async((retval, ))
- for callback in task_request.callbacks or []]
- if publish_result:
- store_result(uuid, retval, SUCCESS)
- if task_on_success:
- task_on_success(retval, uuid, args, kwargs)
- if success_receivers:
- send_success(sender=task, result=retval)
- # -* POST *-
- if state not in IGNORE_STATES:
- if task_request.chord:
- on_chord_part_return(task)
- if task_after_return:
- task_after_return(
- state, retval, uuid, args, kwargs, None,
- )
- if postrun_receivers:
- send_postrun(sender=task, task_id=uuid, task=task,
- args=args, kwargs=kwargs,
- retval=retval, state=state)
- finally:
- pop_task()
- pop_request()
- if not eager:
- try:
- backend_cleanup()
- loader_cleanup()
- except (KeyboardInterrupt, SystemExit, MemoryError):
- raise
- except Exception as exc:
- _logger.error('Process cleanup failed: %r', exc,
- exc_info=True)
- except Exception as exc:
- if eager:
- raise
- R = report_internal_error(task, exc)
- return R, I
- return trace_task
- def trace_task(task, uuid, args, kwargs, request={}, **opts):
- try:
- if task.__trace__ is None:
- task.__trace__ = build_tracer(task.name, task, **opts)
- return task.__trace__(uuid, args, kwargs, request)[0]
- except Exception as exc:
- return report_internal_error(task, exc)
- def _trace_task_ret(name, uuid, args, kwargs, request={}, **opts):
- return trace_task(current_app.tasks[name],
- uuid, args, kwargs, request, **opts)
- trace_task_ret = _trace_task_ret
- def _fast_trace_task(task, uuid, args, kwargs, request={}):
- # setup_worker_optimizations will point trace_task_ret to here,
- # so this is the function used in the worker.
- return _tasks[task].__trace__(uuid, args, kwargs, request)[0]
- def eager_trace_task(task, uuid, args, kwargs, request=None, **opts):
- opts.setdefault('eager', True)
- return build_tracer(task.name, task, **opts)(
- uuid, args, kwargs, request)
- def report_internal_error(task, exc):
- _type, _value, _tb = sys.exc_info()
- try:
- _value = task.backend.prepare_exception(exc)
- exc_info = ExceptionInfo((_type, _value, _tb), internal=True)
- warn(RuntimeWarning(
- 'Exception raised outside body: {0!r}:\n{1}'.format(
- exc, exc_info.traceback)))
- return exc_info
- finally:
- del(_tb)
- def setup_worker_optimizations(app):
- global _tasks
- global trace_task_ret
- # make sure custom Task.__call__ methods that calls super
- # will not mess up the request/task stack.
- _install_stack_protection()
- # all new threads start without a current app, so if an app is not
- # passed on to the thread it will fall back to the "default app",
- # which then could be the wrong app. So for the worker
- # we set this to always return our app. This is a hack,
- # and means that only a single app can be used for workers
- # running in the same process.
- app.set_current()
- set_default_app(app)
- # evaluate all task classes by finalizing the app.
- app.finalize()
- # set fast shortcut to task registry
- _tasks = app._tasks
- trace_task_ret = _fast_trace_task
- try:
- job = sys.modules['celery.worker.job']
- except KeyError:
- pass
- else:
- job.trace_task_ret = _fast_trace_task
- job.__optimize__()
- def reset_worker_optimizations():
- global trace_task_ret
- trace_task_ret = _trace_task_ret
- try:
- delattr(BaseTask, '_stackprotected')
- except AttributeError:
- pass
- try:
- BaseTask.__call__ = _patched.pop('BaseTask.__call__')
- except KeyError:
- pass
- try:
- sys.modules['celery.worker.job'].trace_task_ret = _trace_task_ret
- except KeyError:
- pass
- def _install_stack_protection():
- # Patches BaseTask.__call__ in the worker to handle the edge case
- # where people override it and also call super.
- #
- # - The worker optimizes away BaseTask.__call__ and instead
- # calls task.run directly.
- # - so with the addition of current_task and the request stack
- # BaseTask.__call__ now pushes to those stacks so that
- # they work when tasks are called directly.
- #
- # The worker only optimizes away __call__ in the case
- # where it has not been overridden, so the request/task stack
- # will blow if a custom task class defines __call__ and also
- # calls super().
- if not getattr(BaseTask, '_stackprotected', False):
- _patched['BaseTask.__call__'] = orig = BaseTask.__call__
- def __protected_call__(self, *args, **kwargs):
- stack = self.request_stack
- req = stack.top
- if req and not req._protected and \
- len(stack) == 1 and not req.called_directly:
- req._protected = 1
- return self.run(*args, **kwargs)
- return orig(self, *args, **kwargs)
- BaseTask.__call__ = __protected_call__
- BaseTask._stackprotected = True
|