Explorar o código

Task: callbacks and errbacks are now called as a group

Ask Solem %!s(int64=11) %!d(string=hai) anos
pai
achega
081e8add8a
Modificáronse 1 ficheiros con 10 adicións e 5 borrados
  1. 10 5
      celery/app/trace.py

+ 10 - 5
celery/app/trace.py

@@ -25,7 +25,7 @@ from billiard.einfo import ExceptionInfo
 from kombu.exceptions import EncodeError
 from kombu.utils import kwdict
 
-from celery import current_app
+from celery import current_app, group
 from celery import states, signals
 from celery._state import _task_stack
 from celery.app import set_default_app
@@ -200,8 +200,10 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
         I = Info(state, exc)
         R = I.handle_error_state(task, eager=eager)
         if call_errbacks:
-            [signature(errback, app=app).apply_async((uuid, ))
-             for errback in request.errbacks or []]
+            group(
+                [signature(errback, app=app)
+                 for errback in request.errbacks or []], app=app,
+            ).apply_async((uuid, ))
         return I, R, I.state, I.retval
 
     def trace_task(uuid, args, kwargs, request=None):
@@ -255,8 +257,11 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     try:
                         # callback tasks must be applied before the result is
                         # stored, so that result.children is populated.
-                        [signature(callback, app=app).apply_async((retval, ))
-                            for callback in task_request.callbacks or []]
+                        group(
+                            [signature(callback, app=app)
+                             for callback in task.request.callbacks or []],
+                            app=app,
+                        ).apply_async((retval, ))
                         if publish_result:
                             store_result(
                                 uuid, retval, SUCCESS, request=task_request,