Browse Source

Task callbacks applied as group means trail (.children) stored multiple times. Closes #1936. Closes #1943

Ask Solem 11 years ago
parent
commit
e07ea51f3a
3 changed files with 27 additions and 10 deletions
  1. 3 2
      celery/app/builtins.py
  2. 21 5
      celery/app/trace.py
  3. 3 3
      celery/canvas.py

+ 3 - 2
celery/app/builtins.py

@@ -171,7 +171,8 @@ def add_group_task(app):
         accept_magic_kwargs = False
         _decorated = True
 
-        def run(self, tasks, result, group_id, partial_args):
+        def run(self, tasks, result, group_id, partial_args,
+                add_to_parent=True):
             app = self.app
             result = result_from_tuple(result, app)
             # any partial args are added to all tasks in the group
@@ -186,7 +187,7 @@ def add_group_task(app):
                 [stask.apply_async(group_id=group_id, producer=pub,
                                    add_to_parent=False) for stask in taskit]
             parent = get_current_worker_task()
-            if parent:
+            if add_to_parent and parent:
                 parent.add_trail(result)
             return result
 

+ 21 - 5
celery/app/trace.py

@@ -257,11 +257,27 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
                     try:
                         # callback tasks must be applied before the result is
                         # stored, so that result.children is populated.
-                        group(
-                            [signature(callback, app=app)
-                             for callback in task.request.callbacks or []],
-                            app=app,
-                        ).apply_async((retval, ))
+
+                        # groups are called inline and will store trail
+                        # separately, so need to call them separately
+                        # so that the trail's not added multiple times :(
+                        # (Issue #1936)
+                        callbacks = task.request.callbacks
+                        if callbacks:
+                            if len(task.request.callbacks) > 1:
+                                sigs, groups = [], []
+                                for sig in callbacks:
+                                    sig = signature(sig, app=app)
+                                    if isinstance(sig, group):
+                                        groups.append(sig)
+                                    else:
+                                        sigs.append(sig)
+                                for group_ in groups:
+                                    group.apply_async((retval, ))
+                                if sigs:
+                                    group(sigs).apply_async(retval, )
+                            else:
+                                signature(callbacks[0], app=app).delay(retval)
                         if publish_result:
                             store_result(
                                 uuid, retval, SUCCESS, request=task_request,

+ 3 - 3
celery/canvas.py

@@ -477,13 +477,13 @@ class group(Signature):
                 task['args'] = task._merge(d['args'])[0]
         return group(tasks, app=app, **kwdict(d['options']))
 
-    def apply_async(self, args=(), kwargs=None, **options):
+    def apply_async(self, args=(), kwargs=None, add_to_parent=True, **options):
         tasks = _maybe_clone(self.tasks, app=self._app)
         if not tasks:
             return self.freeze()
         type = self.type
-        return type(*type.prepare(dict(self.options, **options),
-                                  tasks, args))
+        return type(*type.prepare(dict(self.options, **options), tasks, args),
+                    add_to_parent=add_to_parent)
 
     def set_immutable(self, immutable):
         for task in self.tasks: