from __future__ import absolute_import, unicode_literals import pytest from redis import StrictRedis from celery import chain, chord, group from celery.exceptions import TimeoutError from celery.result import AsyncResult, GroupResult from .conftest import flaky from .tasks import add, add_replaced, add_to_all, collect_ids, ids, redis_echo TIMEOUT = 120 class test_chain: @flaky def test_simple_chain(self, manager): c = add.s(4, 4) | add.s(8) | add.s(16) assert c().get(timeout=TIMEOUT) == 32 @flaky def test_complex_chain(self, manager): c = ( add.s(2, 2) | ( add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32) ) | group(add.s(i) for i in range(4)) ) res = c() assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67] @flaky def test_group_chord_group_chain(self, manager): from celery.five import bytes_if_py2 if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') redis_connection = StrictRedis() redis_connection.delete('redis-echo') before = group(redis_echo.si('before {}'.format(i)) for i in range(3)) connect = redis_echo.si('connect') after = group(redis_echo.si('after {}'.format(i)) for i in range(2)) result = (before | connect | after).delay() result.get(timeout=TIMEOUT) redis_messages = list(map( bytes_if_py2, redis_connection.lrange('redis-echo', 0, -1) )) before_items = \ set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2'))) after_items = set(map(bytes_if_py2, (b'after 0', b'after 1'))) assert set(redis_messages[:3]) == before_items assert redis_messages[3] == b'connect' assert set(redis_messages[4:]) == after_items redis_connection.delete('redis-echo') @flaky def test_parent_ids(self, manager, num=10): assert manager.inspect().ping() c = chain(ids.si(i=i) for i in range(num)) c.freeze() res = c() try: res.get(timeout=TIMEOUT) except TimeoutError: print(manager.inspect.active()) print(manager.inspect.reserved()) print(manager.inspect.stats()) raise self.assert_ids(res, num - 1) def assert_ids(self, res, size): i, root = size, res while root.parent: root = root.parent node = res while node: root_id, parent_id, value = node.get(timeout=30) assert value == i if node.parent: assert parent_id == node.parent.id assert root_id == root.id node = node.parent i -= 1 class test_group: @flaky def test_parent_ids(self, manager): assert manager.inspect().ping() g = ( ids.si(i=1) | ids.si(i=2) | group(ids.si(i=i) for i in range(2, 50)) ) res = g() expected_root_id = res.parent.parent.id expected_parent_id = res.parent.id values = res.get(timeout=TIMEOUT) for i, r in enumerate(values): root_id, parent_id, value = r assert root_id == expected_root_id assert parent_id == expected_parent_id assert value == i + 2 def assert_ids(r, expected_value, expected_root_id, expected_parent_id): root_id, parent_id, value = r.get(timeout=TIMEOUT) assert expected_value == value assert root_id == expected_root_id assert parent_id == expected_parent_id class test_chord: @flaky def test_group_chain(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') c = ( add.s(2, 2) | group(add.s(i) for i in range(4)) | add_to_all.s(8) ) res = c() assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15] @flaky def test_parent_ids(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') root = ids.si(i=1) expected_root_id = root.freeze().id g = chain( root, ids.si(i=2), chord( group(ids.si(i=i) for i in range(3, 50)), chain(collect_ids.s(i=50) | ids.si(i=51)), ), ) self.assert_parentids_chord(g(), expected_root_id) @flaky def test_parent_ids__OR(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') root = ids.si(i=1) expected_root_id = root.freeze().id g = ( root | ids.si(i=2) | group(ids.si(i=i) for i in range(3, 50)) | collect_ids.s(i=50) | ids.si(i=51) ) self.assert_parentids_chord(g(), expected_root_id) def assert_parentids_chord(self, res, expected_root_id): assert isinstance(res, AsyncResult) assert isinstance(res.parent, AsyncResult) assert isinstance(res.parent.parent, GroupResult) assert isinstance(res.parent.parent.parent, AsyncResult) assert isinstance(res.parent.parent.parent.parent, AsyncResult) # first we check the last task assert_ids(res, 51, expected_root_id, res.parent.id) # then the chord callback prev, (root_id, parent_id, value) = res.parent.get(timeout=30) assert value == 50 assert root_id == expected_root_id # started by one of the chord header tasks. assert parent_id in res.parent.parent.results # check what the chord callback recorded for i, p in enumerate(prev): root_id, parent_id, value = p assert root_id == expected_root_id assert parent_id == res.parent.parent.parent.id # ids(i=2) root_id, parent_id, value = res.parent.parent.parent.get(timeout=30) assert value == 2 assert parent_id == res.parent.parent.parent.parent.id assert root_id == expected_root_id # ids(i=1) root_id, parent_id, value = res.parent.parent.parent.parent.get( timeout=30) assert value == 1 assert root_id == expected_root_id assert parent_id is None