Browse Source

Integration test to verify PubSub unsubscriptions (#4468)

* [Redis Backend] Integration test to verify PubSub unsubscriptions

* Import sequence for isort check

* Re-order integration tasks import
George Psarakis 7 years ago
parent
commit
a7915054d0
2 changed files with 31 additions and 2 deletions
  1. 9 0
      t/integration/tasks.py
  2. 22 2
      t/integration/test_canvas.py

+ 9 - 0
t/integration/tasks.py

@@ -15,6 +15,15 @@ def add(x, y):
     return x + y
 
 
+@shared_task
+def delayed_sum(numbers, pause_time=1):
+    """Sum the iterable of numbers."""
+    # Allow the task to be in STARTED state for
+    # a limited period of time.
+    sleep(pause_time)
+    return sum(numbers)
+
+
 @shared_task(bind=True)
 def add_replaced(self, x, y):
     """Add two numbers (via the add task)."""

+ 22 - 2
t/integration/test_canvas.py

@@ -1,5 +1,7 @@
 from __future__ import absolute_import, unicode_literals
 
+from time import sleep
+
 import pytest
 from redis import StrictRedis
 
@@ -8,8 +10,8 @@ from celery.exceptions import TimeoutError
 from celery.result import AsyncResult, GroupResult
 
 from .conftest import flaky
-from .tasks import (add, add_replaced, add_to_all, collect_ids, ids,
-                    redis_echo, second_order_replace1)
+from .tasks import (add, add_replaced, add_to_all, collect_ids, delayed_sum,
+                    ids, redis_echo, second_order_replace1)
 
 TIMEOUT = 120
 
@@ -170,6 +172,24 @@ def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
 
 class test_chord:
 
+    @flaky
+    def test_redis_subscribed_channels_leak(self, manager):
+        if not manager.app.conf.result_backend.startswith('redis'):
+            raise pytest.skip('Requires redis result backend.')
+
+        redis_client = StrictRedis()
+        async_result = chord([add.s(5, 6), add.s(6, 7)])(delayed_sum.s())
+        for _ in range(TIMEOUT):
+            if async_result.state == 'STARTED':
+                break
+            sleep(0.2)
+        channels_before = \
+            len(redis_client.execute_command('PUBSUB CHANNELS'))
+        assert async_result.get(timeout=TIMEOUT) == 24
+        channels_after = \
+            len(redis_client.execute_command('PUBSUB CHANNELS'))
+        assert channels_after < channels_before
+
     @flaky
     def test_group_chain(self, manager):
         if not manager.app.conf.result_backend.startswith('redis'):