Explorar el Código

Fixes chain issues from last commit (holy that function is hairy now, need to write two of them)

Ask Solem hace 10 años
padre
commit
336269a23a
Se han modificado 2 ficheros con 39 adiciones y 20 borrados
  1. 22 20
      celery/canvas.py
  2. 17 0
      celery/tests/app/test_builtins.py

+ 22 - 20
celery/canvas.py

@@ -414,7 +414,6 @@ class chain(Signature):
                 task_id, group_id, chord,
                 task_id, group_id, chord,
             )
             )
 
 
-
         if results:
         if results:
             # make sure we can do a link() and link_error() on a chain object.
             # make sure we can do a link() and link_error() on a chain object.
             if self._use_link:
             if self._use_link:
@@ -455,13 +454,15 @@ class chain(Signature):
 
 
         steps_pop = steps.popleft if use_link else steps.pop
         steps_pop = steps.popleft if use_link else steps.pop
         steps_extend = steps.extendleft if use_link else steps.extend
         steps_extend = steps.extendleft if use_link else steps.extend
-        extend_order = reverse if use_link else noop
+        extend_order = reversed if use_link else noop
 
 
         next_step = prev_task = prev_res = None
         next_step = prev_task = prev_res = None
         tasks, results = [], []
         tasks, results = [], []
         i = 0
         i = 0
         while steps:
         while steps:
             task = steps_pop()
             task = steps_pop()
+            last_task = not steps if use_link else not i
+            first_task = not i if use_link else not steps
 
 
             if not isinstance(task, abstract.CallableSignature):
             if not isinstance(task, abstract.CallableSignature):
                 task = from_dict(task, app=app)
                 task = from_dict(task, app=app)
@@ -471,30 +472,29 @@ class chain(Signature):
             # first task gets partial args from chain
             # first task gets partial args from chain
             if clone:
             if clone:
                 task = task.clone(args) if not i else task.clone()
                 task = task.clone(args) if not i else task.clone()
-            elif not i:
+            elif first_task:
                 task.args = tuple(args) + tuple(task.args)
                 task.args = tuple(args) + tuple(task.args)
 
 
             if isinstance(task, chain):
             if isinstance(task, chain):
                 # splice the chain
                 # splice the chain
                 steps_extend(extend_order(task.tasks))
                 steps_extend(extend_order(task.tasks))
                 continue
                 continue
-            elif isinstance(task, group) and steps:
-                # automatically upgrade group(...) | s to chord(group, s)
-                try:
-                    next_step = steps_pop()
-                    # for chords we freeze by pretending it's a normal
-                    # signature instead of a group.
-                    res = Signature.freeze(next_step, root_id=root_id)
-                    task = chord(
-                        task, body=next_step,
-                        task_id=res.task_id, root_id=root_id,
-                    )
-                except IndexError:
-                    pass  # no callback, so keep as group.
-
-            if steps:
-                res = task.freeze(root_id=root_id)
-            else:
+            elif isinstance(task, group):
+                if (steps if use_link else prev_task):
+                    # automatically upgrade group(...) | s to chord(group, s)
+                    try:
+                        next_step = steps_pop() if use_link else prev_task
+                        # for chords we freeze by pretending it's a normal
+                        # signature instead of a group.
+                        res = Signature.freeze(next_step, root_id=root_id)
+                        task = chord(
+                            task, body=next_step,
+                            task_id=res.task_id, root_id=root_id,
+                        )
+                    except IndexError:
+                        pass  # no callback, so keep as group.
+
+            if last_task:
                 # chain(task_id=id) means task id is set for the last task
                 # chain(task_id=id) means task id is set for the last task
                 # in the chain.  If the chord is part of a chord/group
                 # in the chain.  If the chord is part of a chord/group
                 # then that chord/group must synchronize based on the
                 # then that chord/group must synchronize based on the
@@ -504,6 +504,8 @@ class chain(Signature):
                     last_task_id,
                     last_task_id,
                     root_id=root_id, group_id=group_id, chord=chord_body,
                     root_id=root_id, group_id=group_id, chord=chord_body,
                 )
                 )
+            else:
+                res = task.freeze(root_id=root_id)
             root_id = res.id if root_id is None else root_id
             root_id = res.id if root_id is None else root_id
             i += 1
             i += 1
 
 

+ 17 - 0
celery/tests/app/test_builtins.py

@@ -140,15 +140,32 @@ class test_chain(BuiltinsCase):
             self.add.s(20) |
             self.add.s(20) |
             self.add.s(30)
             self.add.s(30)
         )
         )
+        c._use_link = True
         tasks, _ = c.prepare_steps((), c.tasks)
         tasks, _ = c.prepare_steps((), c.tasks)
         self.assertIsInstance(tasks[0], chord)
         self.assertIsInstance(tasks[0], chord)
         self.assertTrue(tasks[0].body.options['link'])
         self.assertTrue(tasks[0].body.options['link'])
         self.assertTrue(tasks[0].body.options['link'][0].options['link'])
         self.assertTrue(tasks[0].body.options['link'][0].options['link'])
 
 
         c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
         c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
+        c2._use_link = True
         tasks2, _ = c2.prepare_steps((), c2.tasks)
         tasks2, _ = c2.prepare_steps((), c2.tasks)
         self.assertIsInstance(tasks2[1], group)
         self.assertIsInstance(tasks2[1], group)
 
 
+    def test_group_to_chord__protocol_2(self):
+        c = (
+            group([self.add.s(i, i) for i in range(5)], app=self.app) |
+            self.add.s(10) |
+            self.add.s(20) |
+            self.add.s(30)
+        )
+        c._use_link = False
+        tasks, _ = c.prepare_steps((), c.tasks)
+        self.assertIsInstance(tasks[-1], chord)
+
+        c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
+        c2._use_link = False
+        tasks2, _ = c2.prepare_steps((), c2.tasks)
+        self.assertIsInstance(tasks2[0], group)
     def test_apply_options(self):
     def test_apply_options(self):
 
 
         class static(Signature):
         class static(Signature):