Browse Source

Issue 3426. Fix to keep the kwargs in a group.

Samuel GIFFARD 8 years ago
parent
commit
938407fc6f
2 changed files with 48 additions and 8 deletions
  1. 11 8
      celery/canvas.py
  2. 37 0
      t/unit/tasks/test_canvas.py

+ 11 - 8
celery/canvas.py

@@ -954,7 +954,8 @@ class group(Signature):
                 yield task, task.freeze(group_id=group_id, root_id=root_id)
 
     def _apply_tasks(self, tasks, producer=None, app=None, p=None,
-                     add_to_parent=None, chord=None, **options):
+                     add_to_parent=None, chord=None,
+                     args=None, kwargs=None, **options):
         # pylint: disable=redefined-outer-name
         #   XXX chord is also a class in outer scope.
         app = app or self.app
@@ -962,6 +963,7 @@ class group(Signature):
             for sig, res in tasks:
                 sig.apply_async(producer=producer, add_to_parent=False,
                                 chord=sig.options.get('chord') or chord,
+                                args=args, kwargs=kwargs,
                                 **options)
 
                 # adding callback to result, such that it will gradually
@@ -996,9 +998,10 @@ class group(Signature):
             return self.freeze()
 
         options, group_id, root_id = self._freeze_gid(options)
-        tasks = self._prepared(self.tasks, args, group_id, root_id, app)
+        tasks = self._prepared(self.tasks, [], group_id, root_id, app)
         p = barrier()
-        results = list(self._apply_tasks(tasks, producer, app, p, **options))
+        results = list(self._apply_tasks(tasks, producer, app, p,
+                                         args=args, kwargs=kwargs, **options))
         result = self.app.GroupResult(group_id, results, ready_barrier=p)
         p.finalize()
 
@@ -1021,9 +1024,9 @@ class group(Signature):
         if not self.tasks:
             return self.freeze()  # empty group returns GroupResult
         options, group_id, root_id = self._freeze_gid(options)
-        tasks = self._prepared(self.tasks, args, group_id, root_id, app)
+        tasks = self._prepared(self.tasks, [], group_id, root_id, app)
         return app.GroupResult(group_id, [
-            sig.apply(**options) for sig, _ in tasks
+            sig.apply(args=args, kwargs=kwargs, **options) for sig, _ in tasks
         ])
 
     def set_immutable(self, immutable):
@@ -1140,7 +1143,7 @@ class chord(Signature):
                  args=(), kwargs={}, app=None, **options):
         Signature.__init__(
             self, task, args,
-            dict(kwargs, header=_maybe_group(header, app),
+            dict(kwargs=kwargs, header=_maybe_group(header, app),
                  body=maybe_signature(body, app=app)), app=app, **options
         )
         self.subtask_type = 'chord'
@@ -1198,8 +1201,8 @@ class chord(Signature):
                     router=None, result_cls=None, **options):
         args = (tuple(args) + tuple(self.args)
                 if args and not self.immutable else self.args)
-        body = kwargs.get('body') or self.kwargs['body']
-        kwargs = dict(self.kwargs, **kwargs)
+        body = kwargs.pop('body', None) or self.kwargs['body']
+        kwargs = dict(self.kwargs['kwargs'], **kwargs)
         body = body.clone(**options)
         app = self._get_app(body)
         tasks = (self.tasks.clone() if isinstance(self.tasks, group)

+ 37 - 0
t/unit/tasks/test_canvas.py

@@ -569,6 +569,43 @@ class test_group(CanvasCase):
         g = group([self.add.s(i, i) for i in range(10)])
         assert list(iter(g)) == g.tasks
 
+    @staticmethod
+    def helper_test_get_delay(result):
+        import time
+        t0 = time.time()
+        while not result.ready():
+            time.sleep(0.01)
+            if time.time() - t0 > 1:
+                return None
+        return result.get()
+
+    def test_kwargs_direct(self):
+        res = [self.add(x=1, y=1), self.add(x=1, y=1)]
+        assert res == [2, 2]
+
+    def test_kwargs_apply(self):
+        x = group([self.add.s(), self.add.s()])
+        res = x.apply(kwargs=dict(x=1, y=1)).get()
+        assert res == [2, 2]
+
+    def test_kwargs_apply_async(self):
+        self.app.conf.task_always_eager = True
+        x = group([self.add.s(), self.add.s()])
+        res = self.helper_test_get_delay(x.apply_async(kwargs=dict(x=1, y=1)))
+        assert res == [2, 2]
+
+    def test_kwargs_delay(self):
+        self.app.conf.task_always_eager = True
+        x = group([self.add.s(), self.add.s()])
+        res = self.helper_test_get_delay(x.delay(x=1, y=1))
+        assert res == [2, 2]
+
+    def test_kwargs_delay_partial(self):
+        self.app.conf.task_always_eager = True
+        x = group([self.add.s(1), self.add.s(x=1)])
+        res = self.helper_test_get_delay(x.delay(y=1))
+        assert res == [2, 2]
+
 
 class test_chord(CanvasCase):