Browse Source

Propagate arguments to chains inside groups (#4481)

* Remove self._frozen from _chain run method

* Add in explicit test for group results in chain
Chris Mitchell 7 years ago
parent
commit
ba2dec7956
2 changed files with 27 additions and 7 deletions
  1. 4 7
      celery/canvas.py
  2. 23 0
      t/integration/test_canvas.py

+ 4 - 7
celery/canvas.py

@@ -570,13 +570,10 @@ class _chain(Signature):
         args = (tuple(args) + tuple(self.args)
                 if args and not self.immutable else self.args)
 
-        if self._frozen:
-            tasks, results = self._frozen
-        else:
-            tasks, results = self.prepare_steps(
-                args, self.tasks, root_id, parent_id, link_error, app,
-                task_id, group_id, chord,
-            )
+        tasks, results = self.prepare_steps(
+            args, self.tasks, root_id, parent_id, link_error, app,
+            task_id, group_id, chord,
+        )
 
         if results:
             if link:

+ 23 - 0
t/integration/test_canvas.py

@@ -36,6 +36,29 @@ class test_chain:
         res = c()
         assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
 
+    @flaky
+    def test_group_results_in_chain(self, manager):
+        # This adds in an explicit test for the special case added in commit
+        # 1e3fcaa969de6ad32b52a3ed8e74281e5e5360e6
+        c = (
+            group(
+                add.s(1, 2) | group(
+                    add.s(1), add.s(2)
+                )
+            )
+        )
+        res = c()
+        assert res.get(timeout=TIMEOUT) == [4, 5]
+
+    @flaky
+    def test_chain_inside_group_receives_arguments(self, manager):
+        c = (
+            add.s(5, 6) |
+            group((add.s(1) | add.s(2), add.s(3)))
+        )
+        res = c()
+        assert res.get(timeout=TIMEOUT) == [14, 14]
+
     @flaky
     def test_group_chord_group_chain(self, manager):
         from celery.five import bytes_if_py2