|
@@ -22,8 +22,8 @@ import sys
|
|
from warnings import warn
|
|
from warnings import warn
|
|
|
|
|
|
from billiard.einfo import ExceptionInfo
|
|
from billiard.einfo import ExceptionInfo
|
|
|
|
+from kombu.exceptions import EncodeError
|
|
from kombu.utils import kwdict
|
|
from kombu.utils import kwdict
|
|
-from kombu.exceptions import SerializationError
|
|
|
|
|
|
|
|
from celery import current_app
|
|
from celery import current_app
|
|
from celery import states, signals
|
|
from celery import states, signals
|
|
@@ -194,7 +194,26 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
|
|
from celery import canvas
|
|
from celery import canvas
|
|
signature = canvas.maybe_signature # maybe_ does not clone if already
|
|
signature = canvas.maybe_signature # maybe_ does not clone if already
|
|
|
|
|
|
|
|
+ def on_error(request, exc, uuid, state=FAILURE, call_errbacks=True):
|
|
|
|
+ if propagate:
|
|
|
|
+ raise
|
|
|
|
+ I = Info(state, exc)
|
|
|
|
+ R = I.handle_error_state(task, eager=eager)
|
|
|
|
+ if call_errbacks:
|
|
|
|
+ [signature(errback, app=app).apply_async((uuid, ))
|
|
|
|
+ for errback in request.errbacks or []]
|
|
|
|
+ return I, R, I.state, I.retval
|
|
|
|
+
|
|
def trace_task(uuid, args, kwargs, request=None):
|
|
def trace_task(uuid, args, kwargs, request=None):
|
|
|
|
+ # R - is the possibly prepared return value.
|
|
|
|
+ # I - is the Info object.
|
|
|
|
+ # retval - is the always unmodified return value.
|
|
|
|
+ # state - is the resulting task state.
|
|
|
|
+
|
|
|
|
+ # This function is very long because we have unrolled all the calls
|
|
|
|
+ # for performance reasons, and because the function is so long
|
|
|
|
+ # we want the main variables (I, and R) to stand out visually from the
|
|
|
|
+ # the rest of the variables, so breaking PEP8 is worth it ;)
|
|
R = I = retval = state = None
|
|
R = I = retval = state = None
|
|
kwargs = kwdict(kwargs)
|
|
kwargs = kwdict(kwargs)
|
|
try:
|
|
try:
|
|
@@ -225,17 +244,11 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
|
|
I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
|
|
I, R = Info(IGNORED, exc), ExceptionInfo(internal=True)
|
|
state, retval = I.state, I.retval
|
|
state, retval = I.state, I.retval
|
|
except Retry as exc:
|
|
except Retry as exc:
|
|
- I = Info(RETRY, exc)
|
|
|
|
- state, retval = I.state, I.retval
|
|
|
|
- R = I.handle_error_state(task, eager=eager)
|
|
|
|
|
|
+ I, R, state, retval = on_error(
|
|
|
|
+ task_request, exc, uuid, RETRY, call_errbacks=False,
|
|
|
|
+ )
|
|
except Exception as exc:
|
|
except Exception as exc:
|
|
- if propagate:
|
|
|
|
- raise
|
|
|
|
- I = Info(FAILURE, exc)
|
|
|
|
- state, retval = I.state, I.retval
|
|
|
|
- R = I.handle_error_state(task, eager=eager)
|
|
|
|
- [signature(errback, app=app).apply_async((uuid, ))
|
|
|
|
- for errback in task_request.errbacks or []]
|
|
|
|
|
|
+ I, R, state, retval = on_error(task_request, exc, uuid)
|
|
except BaseException as exc:
|
|
except BaseException as exc:
|
|
raise
|
|
raise
|
|
else:
|
|
else:
|
|
@@ -248,14 +261,8 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
|
|
store_result(
|
|
store_result(
|
|
uuid, retval, SUCCESS, request=task_request,
|
|
uuid, retval, SUCCESS, request=task_request,
|
|
)
|
|
)
|
|
- except SerializationError as exc:
|
|
|
|
- if propagate:
|
|
|
|
- raise
|
|
|
|
- I = Info(FAILURE, exc)
|
|
|
|
- state, retval = I.state, I.retval
|
|
|
|
- R = I.handle_error_state(task, eager=eager)
|
|
|
|
- [signature(errback, app=app).apply_async((uuid, ))
|
|
|
|
- for errback in task_request.errbacks or []]
|
|
|
|
|
|
+ except EncodeError as exc:
|
|
|
|
+ I, R, state, retval = on_error(task_request, exc, uuid)
|
|
else:
|
|
else:
|
|
if task_on_success:
|
|
if task_on_success:
|
|
task_on_success(retval, uuid, args, kwargs)
|
|
task_on_success(retval, uuid, args, kwargs)
|