Browse Source

Task: replace: Append to chain/chord (Closes #3232)

Fixed issue #3232, adding the signature to the chain (if there is any).
Fixed the chord suppress if the given signature contains one.
honux 8 years ago
parent
commit
7a8c47bf13
2 changed files with 13 additions and 0 deletions
  1. 12 0
      celery/app/task.py
  2. 1 0
      celery/tests/tasks/test_tasks.py

+ 12 - 0
celery/app/task.py

@@ -774,6 +774,12 @@ class Task(object):
 
         """
         chord = self.request.chord
+        if 'chord' in sig.options:
+            if chord:
+                chord = sig.options['chord'] | chord
+            else:
+                chord = sig.options['chord']
+
         if isinstance(sig, group):
             sig |= self.app.tasks['celery.accumulate'].s(index=0).set(
                 chord=chord,
@@ -781,10 +787,16 @@ class Task(object):
                 link_error=self.request.errbacks,
             )
             chord = None
+
+        if self.request.chain:
+            for t in self.request.chain:
+                sig |= signature(t)
+
         sig.freeze(self.request.id,
                    group_id=self.request.group,
                    chord=chord,
                    root_id=self.request.root_id)
+
         sig.delay()
         raise Ignore('Replaced by new task')
 

+ 1 - 0
celery/tests/tasks/test_tasks.py

@@ -411,6 +411,7 @@ class test_tasks(TasksCase):
 
     def test_replace(self):
         sig1 = Mock(name='sig1')
+        sig1.options = {}
         with self.assertRaises(Ignore):
             self.mytask.replace(sig1)