浏览代码

task | group now returns a GroupResult (Issue #2354)

Ask Solem 10 年之前
父节点
当前提交
7024c2c23a
共有 1 个文件被更改,包括 30 次插入28 次删除
  1. 30 28
      celery/canvas.py

+ 30 - 28
celery/canvas.py

@@ -388,27 +388,19 @@ class chain(Signature):
         args = (tuple(args) + tuple(self.args)
                 if args and not self.immutable else self.args)
         tasks, results = self.prepare_steps(
-            args, self.tasks, root_id, link_error,
+            args, self.tasks, root_id, link_error, app,
+            task_id, group_id, chord,
         )
-        if not results:
-            return
-        result = results[-1]
-        last_task = tasks[-1]
-        if group_id:
-            last_task.set(group_id=group_id)
-        if chord:
-            last_task.set(chord=chord)
-        if task_id:
-            last_task.set(task_id=task_id)
-            result = last_task.type.AsyncResult(task_id)
-        # make sure we can do a link() and link_error() on a chain object.
-        if link:
-            tasks[-1].set(link=link)
-        tasks[0].apply_async(**options)
-        return result
+        if results:
+            # make sure we can do a link() and link_error() on a chain object.
+            if link:
+                tasks[-1].set(link=link)
+            tasks[0].apply_async(**options)
+            return results[-1]
 
     def prepare_steps(self, args, tasks,
                       root_id=None, link_error=None, app=None,
+                      last_task_id=None, group_id=None, chord_body=None,
                       from_dict=Signature.from_dict):
         app = app or self.app
         steps = deque(tasks)
@@ -417,21 +409,15 @@ class chain(Signature):
         i = 0
         while steps:
             task = steps.popleft()
+
             if not isinstance(task, Signature):
                 task = from_dict(task, app=app)
-            if not i:  # first task
-                # first task gets partial args from chain
-                task = task.clone(args)
-                res = task.freeze(root_id=root_id)
-                root_id = res.id if root_id is None else root_id
-            else:
-                task = task.clone()
-                res = task.freeze(root_id=root_id)
-            i += 1
-
             if isinstance(task, group):
                 task = maybe_unroll_group(task)
 
+            # first task gets partial args from chain
+            task = task.clone(args) if not i else task.clone()
+
             if isinstance(task, chain):
                 # splice the chain
                 steps.extendleft(reversed(task.tasks))
@@ -442,7 +428,7 @@ class chain(Signature):
                     next_step = steps.popleft()
                     # for chords we freeze by pretending it's a normal
                     # signature instead of a group.
-                    res = Signature.freeze(next_step)
+                    res = Signature.freeze(next_step, root_id=root_id)
                     task = chord(
                         task, body=next_step,
                         task_id=res.task_id, root_id=root_id,
@@ -450,6 +436,22 @@ class chain(Signature):
                 except IndexError:
                     pass  # no callback, so keep as group.
 
+            if steps:
+                res = task.freeze(root_id=root_id)
+            else:
+                # chain(task_id=id) means task id is set for the last task
+                # in the chain.  If the chord is part of a chord/group
+                # then that chord/group must synchronize based on the
+                # last task in the chain, so we only set the group_id and
+                # chord callback for the last task.
+                res = task.freeze(
+                    last_task_id,
+                    root_id=root_id, group_id=group_id, chord=chord_body,
+                )
+            root_id = res.id if root_id is None else root_id
+            i += 1
+
+
             if prev_task:
                 # link previous task to this task.
                 prev_task.link(task)