Browse Source

Handle kombu serialization errors as task errors.

Ionel Cristian Mărieș 11 years ago
parent
commit
bbd1e4b28c
1 changed files with 23 additions and 12 deletions
  1. 23 12
      celery/app/trace.py

+ 23 - 12
celery/app/trace.py

@@ -23,6 +23,7 @@ from warnings import warn
 
 from billiard.einfo import ExceptionInfo
 from kombu.utils import kwdict
+from kombu.exceptions import SerializationError
 
 from celery import current_app
 from celery import states, signals
@@ -238,18 +239,28 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                 except BaseException as exc:
                     raise
                 else:
-                    # callback tasks must be applied before the result is
-                    # stored, so that result.children is populated.
-                    [signature(callback, app=app).apply_async((retval, ))
-                        for callback in task_request.callbacks or []]
-                    if publish_result:
-                        store_result(
-                            uuid, retval, SUCCESS, request=task_request,
-                        )
-                    if task_on_success:
-                        task_on_success(retval, uuid, args, kwargs)
-                    if success_receivers:
-                        send_success(sender=task, result=retval)
+                    try:
+                        # callback tasks must be applied before the result is
+                        # stored, so that result.children is populated.
+                        [signature(callback, app=app).apply_async((retval, ))
+                            for callback in task_request.callbacks or []]
+                        if publish_result:
+                            store_result(
+                                uuid, retval, SUCCESS, request=task_request,
+                            )
+                    except SerializationError as exc:
+                        if propagate:
+                            raise
+                        I = Info(FAILURE, exc)
+                        state, retval = I.state, I.retval
+                        R = I.handle_error_state(task, eager=eager)
+                        [signature(errback, app=app).apply_async((uuid, ))
+                            for errback in task_request.errbacks or []]
+                    else:
+                        if task_on_success:
+                            task_on_success(retval, uuid, args, kwargs)
+                        if success_receivers:
+                            send_success(sender=task, result=retval)
 
                 # -* POST *-
                 if state not in IGNORE_STATES: