Browse Source

Also pass args groups in one-level chains

Ask Solem 12 years ago
parent
commit
d6cfd6c87c
2 changed files with 12 additions and 6 deletions
  1. 5 4
      celery/app/builtins.py
  2. 7 2
      celery/canvas.py

+ 5 - 4
celery/app/builtins.py

@@ -183,8 +183,7 @@ def add_group_task(app):
 
 @shared_task
 def add_chain_task(app):
-    from celery.canvas import chord, group, maybe_subtask
-    from celery.result import GroupResult
+    from celery.canvas import Signature, chord, group, maybe_subtask
     _app = app
 
     class Chain(app.Task):
@@ -202,14 +201,16 @@ def add_chain_task(app):
                 task = maybe_subtask(steps.popleft())
                 task = task.clone() if i else task.clone(args)
                 res = task._freeze()
-                AsyncResult = task.type.AsyncResult
                 i += 1
 
                 if isinstance(task, group):
                     # automatically upgrade group(..) | s to chord(group, s)
                     try:
                         next_step = steps.popleft()
-                        task = chord(task, body=next_step, task_id=tid)
+                        # for chords we freeze by pretending it's a normal
+                        # task instead of a group.
+                        res = Signature._freeze(task)
+                        task = chord(task, body=next_step, task_id=res.task_id)
                     except IndexError:
                         pass
                 if prev_task:

+ 7 - 2
celery/canvas.py

@@ -19,7 +19,7 @@ from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
 
 from celery import current_app
 from celery.local import Proxy
-from celery.result import GroupResult, from_serializable as serialized_result
+from celery.result import GroupResult
 from celery.utils.functional import (
     maybe_list, is_list, regen,
     chunks as _chunks,
@@ -334,7 +334,12 @@ class group(Signature):
 
     @classmethod
     def from_dict(self, d):
-        return group(d['kwargs']['tasks'], **kwdict(d['options']))
+        tasks = d['kwargs']['tasks']
+        if d['args'] and tasks:
+            # partial args passed on to all tasks in the group (Issue #1057).
+            for task in tasks:
+                task['args'] = d['args'] + task['args']
+        return group(tasks, **kwdict(d['options']))
 
     def __call__(self, *partial_args, **options):
         tasks, result, gid, args = self.type.prepare(options,