Browse Source

Allow join in eager canvas application (#4617)

* Allow join in eager canvas application

* Unit & integration tests for canvas in eager tests

* Changelog for eager application test fix
Nicholas Pilon 7 years ago
parent
commit
5607ae976b
6 changed files with 46 additions and 2 deletions
  1. 4 0
      Changelog
  2. 3 2
      celery/canvas.py
  3. 1 0
      docs/whatsnew-4.2.rst
  4. 7 0
      t/integration/tasks.py
  5. 11 0
      t/integration/test_canvas.py
  6. 20 0
      t/unit/tasks/test_canvas.py

+ 4 - 0
Changelog

@@ -287,6 +287,10 @@ an overview of what's new in Celery 4.2.
 
     Contributed by **Vinod Chandru**.
 
+- **Development/Testing**: Allow eager application of canvas structures (#4576)
+
+    Contributed by **Nicholas Pilon**.
+
 Documentation, CI, Installation and Tests fixes:
 
 

+ 3 - 2
celery/canvas.py

@@ -24,7 +24,7 @@ from vine import barrier
 from celery._state import current_app
 from celery.five import python_2_unicode_compatible
 from celery.local import try_import
-from celery.result import GroupResult
+from celery.result import GroupResult, allow_join_result
 from celery.utils import abstract
 from celery.utils.functional import _regen
 from celery.utils.functional import chunks as _chunks
@@ -554,7 +554,8 @@ class _chain(Signature):
         # python is best at unpacking kwargs, so .run is here to do that.
         app = self.app
         if app.conf.task_always_eager:
-            return self.apply(args, kwargs, **options)
+            with allow_join_result():
+                return self.apply(args, kwargs, **options)
         return self.run(args, kwargs, app=app, **(
             dict(self.options, **options) if options else self.options))
 

+ 1 - 0
docs/whatsnew-4.2.rst

@@ -113,6 +113,7 @@ Mikołaj <mikolevy1@gmail.com>
 Misha Wolfson <myw@users.noreply.github.com>
 Nick Eaket <4418194+neaket360pi@users.noreply.github.com>
 Nicolas Mota <nicolas_mota@live.com>
+Nicholas Pilon <npilon@gmail.com>
 Omer Katz <omer.drow@gmail.com>
 Patrick Cloke <clokep@users.noreply.github.com>
 Patrick Zhang <patdujour@gmail.com>

+ 7 - 0
t/integration/tasks.py

@@ -23,6 +23,13 @@ def add(x, y):
     return x + y
 
 
+@shared_task
+def chain_add(x, y):
+    (
+        add.s(x, x) | add.s(y)
+    ).apply_async()
+
+
 @shared_task
 def delayed_sum(numbers, pause_time=1):
     """Sum the iterable of numbers."""

+ 11 - 0
t/integration/test_canvas.py

@@ -59,6 +59,17 @@ class test_chain:
         res = c()
         assert res.get(timeout=TIMEOUT) == [14, 14]
 
+    @flaky
+    def test_eager_chain_inside_task(self, manager):
+        from .tasks import chain_add
+
+        prev = chain_add.app.conf.task_always_eager
+        chain_add.app.conf.task_always_eager = True
+
+        chain_add.apply_async(args=(4, 8), throw=True).get()
+
+        chain_add.app.conf.task_always_eager = prev
+
     @flaky
     def test_group_chord_group_chain(self, manager):
         from celery.five import bytes_if_py2

+ 20 - 0
t/unit/tasks/test_canvas.py

@@ -411,6 +411,26 @@ class test_chain(CanvasCase):
         self.app.conf.task_always_eager = True
         assert ~(self.add.s(4, 4) | self.add.s(8)) == 16
 
+    def test_chain_always_eager(self):
+        self.app.conf.task_always_eager = True
+        from celery import _state
+        from celery import result
+
+        fixture_task_join_will_block = _state.task_join_will_block
+        try:
+            _state.task_join_will_block = _state.orig_task_join_will_block
+            result.task_join_will_block = _state.orig_task_join_will_block
+
+            @self.app.task(shared=False)
+            def chain_add():
+                return (self.add.s(4, 4) | self.add.s(8)).apply_async()
+
+            r = chain_add.apply_async(throw=True).get()
+            assert r.get() == 16
+        finally:
+            _state.task_join_will_block = fixture_task_join_will_block
+            result.task_join_will_block = fixture_task_join_will_block
+
     def test_apply(self):
         x = chain(self.add.s(4, 4), self.add.s(8), self.add.s(10))
         res = x.apply()