Explorar o código

Fix length-1 and nested chords (#4393 #4055 #3885 #3597 #3574 #3323) (#4437)

* Don't convert single-task chord to chain

* Fix evaluation of nested chords
Alex Hill %!s(int64=7) %!d(string=hai) anos
pai
achega
10f06ea1df
Modificáronse 2 ficheiros con 51 adicións e 17 borrados
  1. 2 17
      celery/canvas.py
  2. 49 0
      t/integration/test_canvas.py

+ 2 - 17
celery/canvas.py

@@ -375,20 +375,12 @@ class Signature(dict):
     def __or__(self, other):
         # These could be implemented in each individual class,
         # I'm sure, but for now we have this.
-        if isinstance(other, chord) and len(other.tasks) == 1:
-            # chord with one header -> header[0] | body
-            other = other.tasks[0] | other.body
-
         if isinstance(self, group):
             if isinstance(other, group):
                 # group() | group() -> single group
                 return group(
                     itertools.chain(self.tasks, other.tasks), app=self.app)
             # group() | task -> chord
-            if len(self.tasks) == 1:
-                # group(ONE.s()) | other -> ONE.s() | other
-                # Issue #3323
-                return self.tasks[0] | other
             return chord(self, body=other, app=self._app)
         elif isinstance(other, group):
             # unroll group with one member
@@ -409,10 +401,6 @@ class Signature(dict):
             return _chain(seq_concat_seq(
                 self.unchain_tasks(), other.unchain_tasks()), app=self._app)
         elif isinstance(self, chord):
-            # chord(ONE, body) | other -> ONE | body | other
-            # chord with one header task is unecessary.
-            if len(self.tasks) == 1:
-                return self.tasks[0] | self.body | other
             # chord | task ->  attach to body
             sig = self.clone()
             sig.body = sig.body | other
@@ -1243,11 +1231,6 @@ class chord(Signature):
         if app.conf.task_always_eager:
             return self.apply(args, kwargs,
                               body=body, task_id=task_id, **options)
-        if len(self.tasks) == 1:
-            # chord([A], B) can be optimized as A | B
-            # - Issue #3323
-            return (self.tasks[0] | body).set(task_id=task_id).apply_async(
-                args, kwargs, **options)
         # chord([A, B, ...], C)
         return self.run(tasks, body, args, task_id=task_id, **options)
 
@@ -1291,6 +1274,8 @@ class chord(Signature):
 
         # Chains should not be passed to the header tasks. See #3771
         options.pop('chain', None)
+        # Neither should chords, for deeply nested chords to work
+        options.pop('chord', None)
 
         parent = app.backend.apply_chord(
             header, partial_args, group_id, body,

+ 49 - 0
t/integration/test_canvas.py

@@ -208,6 +208,55 @@ class test_chord:
         res = c()
         assert res.get(timeout=TIMEOUT) == 11
 
+    @flaky
+    def test_single_task_header(self, manager):
+        try:
+            manager.app.backend.ensure_chords_allowed()
+        except NotImplementedError as e:
+            raise pytest.skip(e.args[0])
+
+        c1 = chord([add.s(2, 5)], body=add_to_all.s(9))
+        res1 = c1()
+        assert res1.get(timeout=TIMEOUT) == [16]
+
+        c2 = group([add.s(2, 5)]) | add_to_all.s(9)
+        res2 = c2()
+        assert res2.get(timeout=TIMEOUT) == [16]
+
+    @flaky
+    def test_nested_chord(self, manager):
+        try:
+            manager.app.backend.ensure_chords_allowed()
+        except NotImplementedError as e:
+            raise pytest.skip(e.args[0])
+
+        c1 = chord([
+            chord([add.s(1, 2), add.s(3, 4)], add.s([5])),
+            chord([add.s(6, 7)], add.s([10]))
+        ], add_to_all.s(['A']))
+        res1 = c1()
+        assert res1.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
+
+        c2 = group([
+            group([add.s(1, 2), add.s(3, 4)]) | add.s([5]),
+            group([add.s(6, 7)]) | add.s([10]),
+        ]) | add_to_all.s(['A'])
+        res2 = c2()
+        assert res2.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
+
+        c = group([
+            group([
+                group([
+                    group([
+                        add.s(1, 2)
+                    ]) | add.s([3])
+                ]) | add.s([4])
+            ]) | add.s([5])
+        ]) | add.s([6])
+
+        res = c()
+        assert [[[[3, 3], 4], 5], 6] == res.get(timeout=TIMEOUT)
+
     @flaky
     def test_parent_ids(self, manager):
         if not manager.app.conf.result_backend.startswith('redis'):