Browse Source

group/chord now appends partial args to all tasks in the group

Ask Solem 12 years ago
parent
commit
4905c5814e
2 changed files with 14 additions and 15 deletions
  1. 10 10
      celery/app/builtins.py
  2. 4 5
      celery/canvas.py

+ 10 - 10
celery/app/builtins.py

@@ -128,8 +128,8 @@ def add_group_task(app):
         def run(self, tasks, result, group_id, partial_args):
             app = self.app
             result = from_serializable(result)
-            # any partial args are added to the first task in the group
-            taskit = (subtask(task) if i else subtask(task).clone(partial_args)
+            # any partial args are added to all tasks in the group
+            taskit = (subtask(task).clone(partial_args)
                         for i, task in enumerate(tasks))
             if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
                 return app.GroupResult(result.id,
@@ -142,7 +142,7 @@ def add_group_task(app):
                 parent.request.children.append(result)
             return result
 
-        def prepare(self, options, tasks, **kwargs):
+        def prepare(self, options, tasks, args, **kwargs):
             options['group_id'] = group_id = \
                     options.setdefault('task_id', uuid())
 
@@ -157,19 +157,22 @@ def add_group_task(app):
                 return task, self.AsyncResult(tid)
 
             tasks, results = zip(*[prepare_member(task) for task in tasks])
-            return tasks, self.app.GroupResult(group_id, results), group_id
+            return (tasks, self.app.GroupResult(group_id, results),
+                    group_id, args)
 
         def apply_async(self, partial_args=(), kwargs={}, **options):
             if self.app.conf.CELERY_ALWAYS_EAGER:
                 return self.apply(args, kwargs, **options)
-            tasks, result, gid = self.prepare(options, **kwargs)
+            tasks, result, gid, args = self.prepare(options,
+                                            args=partial_args, **kwargs)
             super(Group, self).apply_async((list(tasks),
-                result.serializable(), gid, partial_args), **options)
+                result.serializable(), gid, args), **options)
             return result
 
         def apply(self, args=(), kwargs={}, **options):
             return super(Group, self).apply(
-                    self.prepare(options, **kwargs) + (args, ), **options)
+                    self.prepare(options, args=args, **kwargs),
+                    **options).get()
     return Group
 
 
@@ -184,7 +187,6 @@ def add_chain_task(app):
         accept_magic_kwargs = False
 
         def prepare_steps(self, args, tasks):
-            print('ARGS: %r' % (args, ))
             steps = deque(tasks)
             next_step = prev_task = prev_res = None
             tasks, results = [], []
@@ -233,8 +235,6 @@ def add_chain_task(app):
             if task_id:
                 tasks[-1].set(task_id=task_id)
                 result = tasks[-1].type.AsyncResult(task_id)
-            print("TASKS[-1]: %r" % (tasks[-1], ))
-            print("ID: %r" % (tasks[-1].options, ))
             tasks[0].apply_async()
             return result
 

+ 4 - 5
celery/canvas.py

@@ -304,9 +304,9 @@ class group(Signature):
         return group(d['kwargs']['tasks'], **kwdict(d['options']))
 
     def __call__(self, *partial_args, **options):
-        tasks, result, gid = self.type.prepare(options,
-                                map(Signature.clone, self.tasks))
-        return self.type(tasks, result, gid, partial_args)
+        tasks, result, gid, args = self.type.prepare(options,
+                    map(Signature.clone, self.tasks), partial_args)
+        return self.type(tasks, result, gid, args)
 
     def skew(self, start=1.0, stop=None, step=1.0):
         _next_skew = fxrange(start, stop, step, repeatlast=True).next
@@ -328,8 +328,7 @@ class chord(Signature):
     def __init__(self, header, body=None, **options):
         Signature.__init__(self, 'celery.chord', (),
                          {'header': _maybe_group(header),
-                          'body': maybe_subtask(body)},
-                         options, immutable=True)
+                          'body': maybe_subtask(body)}, options)
         self.subtask_type = 'chord'
 
     @classmethod