Browse Source

Added a test to verify that second order replace works as expected. Fixes #3116.

Omer Katz 7 years ago
parent
commit
3b330d975d
2 changed files with 51 additions and 2 deletions
  1. 29 1
      t/integration/tasks.py
  2. 22 1
      t/integration/test_canvas.py

+ 29 - 1
t/integration/tasks.py

@@ -3,7 +3,7 @@ from __future__ import absolute_import, unicode_literals
 
 from time import sleep
 
-from celery import group, shared_task
+from celery import chain, group, shared_task
 from celery.utils.log import get_task_logger
 
 logger = get_task_logger(__name__)
@@ -74,3 +74,31 @@ def redis_echo(message):
 
     redis_connection = StrictRedis()
     redis_connection.rpush('redis-echo', message)
+
+
+@shared_task(bind=True)
+def second_order_replace1(self, state=False):
+    from redis import StrictRedis
+
+    redis_connection = StrictRedis()
+    if not state:
+        redis_connection.rpush('redis-echo', 'In A')
+        new_task = chain(second_order_replace2.s(),
+                         second_order_replace1.si(state=True))
+        raise self.replace(new_task)
+    else:
+        redis_connection.rpush('redis-echo', 'Out A')
+
+
+@shared_task(bind=True)
+def second_order_replace2(self, state=False):
+    from redis import StrictRedis
+
+    redis_connection = StrictRedis()
+    if not state:
+        redis_connection.rpush('redis-echo', 'In B')
+        new_task = chain(redis_echo.s("In/Out C"),
+                         second_order_replace2.si(state=True))
+        raise self.replace(new_task)
+    else:
+        redis_connection.rpush('redis-echo', 'Out B')

+ 22 - 1
t/integration/test_canvas.py

@@ -8,7 +8,8 @@ from celery.exceptions import TimeoutError
 from celery.result import AsyncResult, GroupResult
 
 from .conftest import flaky
-from .tasks import add, add_replaced, add_to_all, collect_ids, ids, redis_echo
+from .tasks import (add, add_replaced, add_to_all, collect_ids, ids,
+                    redis_echo, second_order_replace1)
 
 TIMEOUT = 120
 
@@ -58,6 +59,26 @@ class test_chain:
         assert set(redis_messages[4:]) == after_items
         redis_connection.delete('redis-echo')
 
+    @flaky
+    def test_second_order_replace(self, manager):
+        from celery.five import bytes_if_py2
+
+        if not manager.app.conf.result_backend.startswith('redis'):
+            raise pytest.skip('Requires redis result backend.')
+
+        redis_connection = StrictRedis()
+        redis_connection.delete('redis-echo')
+
+        result = second_order_replace1.delay()
+        result.get(timeout=TIMEOUT)
+        redis_messages = list(map(
+            bytes_if_py2,
+            redis_connection.lrange('redis-echo', 0, -1)
+        ))
+
+        expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A']
+        assert redis_messages == expected_messages
+
     @flaky
     def test_parent_ids(self, manager, num=10):
         assert manager.inspect().ping()