from __future__ import absolute_import, unicode_literals import pytest from celery import group from .conftest import flaky, get_active_redis_channels from .tasks import add, add_ignore_result, print_unicode, retry_once, sleeping class test_tasks: @flaky def test_task_accepted(self, manager, sleep=1): r1 = sleeping.delay(sleep) sleeping.delay(sleep) manager.assert_accepted([r1.id]) @flaky def test_task_retried(self): res = retry_once.delay() assert res.get(timeout=10) == 1 # retried once @flaky def test_unicode_task(self, manager): manager.join( group(print_unicode.s() for _ in range(5))(), timeout=10, propagate=True, ) class tests_task_redis_result_backend: def setup(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') def test_ignoring_result_no_subscriptions(self): assert get_active_redis_channels() == [] result = add_ignore_result.delay(1, 2) assert result.ignored is True assert get_active_redis_channels() == [] def test_asyncresult_forget_cancels_subscription(self): result = add.delay(1, 2) assert get_active_redis_channels() == [ "celery-task-meta-{}".format(result.id) ] result.forget() assert get_active_redis_channels() == [] def test_asyncresult_get_cancels_subscription(self): result = add.delay(1, 2) assert get_active_redis_channels() == [ "celery-task-meta-{}".format(result.id) ] assert result.get(timeout=3) == 3 assert get_active_redis_channels() == []