Browse Source

Support chords with CELERY_TASK_ALWAYS_EAGER (fix #4873) (#4979)

Axel Haustant 6 years ago
parent
commit
62dc3f087a
5 changed files with 44 additions and 3 deletions
  1. 1 0
      CONTRIBUTORS.txt
  2. 3 2
      celery/canvas.py
  3. 6 1
      t/integration/tasks.py
  4. 11 0
      t/integration/test_canvas.py
  5. 23 0
      t/unit/tasks/test_canvas.py

+ 1 - 0
CONTRIBUTORS.txt

@@ -262,3 +262,4 @@ Chris Mitchell, 2018/02/27
 Josue Balandrano Coronel, 2018/05/24
 Federico Bond, 2018/06/20
 Tom Booth, 2018/07/06
+Axel haustant, 2018/08/14

+ 3 - 2
celery/canvas.py

@@ -1226,8 +1226,9 @@ class chord(Signature):
         tasks = (self.tasks.clone() if isinstance(self.tasks, group)
                  else group(self.tasks, app=app))
         if app.conf.task_always_eager:
-            return self.apply(args, kwargs,
-                              body=body, task_id=task_id, **options)
+            with allow_join_result():
+                return self.apply(args, kwargs,
+                                  body=body, task_id=task_id, **options)
         # chord([A, B, ...], C)
         return self.run(tasks, body, args, task_id=task_id, **options)
 

+ 6 - 1
t/integration/tasks.py

@@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
 
 from time import sleep
 
-from celery import chain, group, shared_task
+from celery import chain, chord, group, shared_task
 from celery.exceptions import SoftTimeLimitExceeded
 from celery.utils.log import get_task_logger
 
@@ -42,6 +42,11 @@ def chain_add(x, y):
     ).apply_async()
 
 
+@shared_task
+def chord_add(x, y):
+    chord(add.s(x, x), add.s(y)).apply_async()
+
+
 @shared_task
 def delayed_sum(numbers, pause_time=1):
     """Sum the iterable of numbers."""

+ 11 - 0
t/integration/test_canvas.py

@@ -366,6 +366,17 @@ class test_chord:
         res = c()
         assert res.get() == [0, 5 + 6 + 7]
 
+    @flaky
+    def test_eager_chord_inside_task(self, manager):
+        from .tasks import chord_add
+
+        prev = chord_add.app.conf.task_always_eager
+        chord_add.app.conf.task_always_eager = True
+
+        chord_add.apply_async(args=(4, 8), throw=True).get()
+
+        chord_add.app.conf.task_always_eager = prev
+
     @flaky
     def test_group_chain(self, manager):
         if not manager.app.conf.result_backend.startswith('redis'):

+ 23 - 0
t/unit/tasks/test_canvas.py

@@ -747,6 +747,29 @@ class test_chord(CanvasCase):
         x.tasks = [self.add.s(2, 2)]
         x.freeze()
 
+    def test_chain_always_eager(self):
+        self.app.conf.task_always_eager = True
+        from celery import _state
+        from celery import result
+
+        fixture_task_join_will_block = _state.task_join_will_block
+        try:
+            _state.task_join_will_block = _state.orig_task_join_will_block
+            result.task_join_will_block = _state.orig_task_join_will_block
+
+            @self.app.task(shared=False)
+            def finalize(*args):
+                pass
+
+            @self.app.task(shared=False)
+            def chord_add():
+                return chord([self.add.s(4, 4)], finalize.s()).apply_async()
+
+            chord_add.apply_async(throw=True).get()
+        finally:
+            _state.task_join_will_block = fixture_task_join_will_block
+            result.task_join_will_block = fixture_task_join_will_block
+
 
 class test_maybe_signature(CanvasCase):