| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 | from __future__ import absolute_import, unicode_literalsimport pytestfrom redis import StrictRedisfrom celery import chain, chord, groupfrom celery.exceptions import TimeoutErrorfrom celery.result import AsyncResult, GroupResultfrom .conftest import flakyfrom .tasks import add, add_replaced, add_to_all, collect_ids, ids, redis_echoTIMEOUT = 120class test_chain:    @flaky    def test_simple_chain(self, manager):        c = add.s(4, 4) | add.s(8) | add.s(16)        assert c().get(timeout=TIMEOUT) == 32    @flaky    def test_complex_chain(self, manager):        c = (            add.s(2, 2) | (                add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)            ) |            group(add.s(i) for i in range(4))        )        res = c()        assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]    @flaky    def test_group_chord_group_chain(self, manager):        from celery.five import bytes_if_py2        if not manager.app.conf.result_backend.startswith('redis'):            raise pytest.skip('Requires redis result backend.')        redis_connection = StrictRedis()        redis_connection.delete('redis-echo')        before = group(redis_echo.si('before {}'.format(i)) for i in range(3))        connect = redis_echo.si('connect')        after = group(redis_echo.si('after {}'.format(i)) for i in range(2))        result = (before | connect | after).delay()        result.get(timeout=TIMEOUT)        redis_messages = list(map(            bytes_if_py2,            redis_connection.lrange('redis-echo', 0, -1)        ))        before_items = \            set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2')))        after_items = set(map(bytes_if_py2, (b'after 0', b'after 1')))        assert set(redis_messages[:3]) == before_items        assert redis_messages[3] == b'connect'        assert set(redis_messages[4:]) == after_items        redis_connection.delete('redis-echo')    @flaky    def test_parent_ids(self, manager, num=10):        assert manager.inspect().ping()        c = chain(ids.si(i=i) for i in range(num))        c.freeze()        res = c()        try:            res.get(timeout=TIMEOUT)        except TimeoutError:            print(manager.inspect.active())            print(manager.inspect.reserved())            print(manager.inspect.stats())            raise        self.assert_ids(res, num - 1)    def assert_ids(self, res, size):        i, root = size, res        while root.parent:            root = root.parent        node = res        while node:            root_id, parent_id, value = node.get(timeout=30)            assert value == i            if node.parent:                assert parent_id == node.parent.id            assert root_id == root.id            node = node.parent            i -= 1class test_group:    @flaky    def test_parent_ids(self, manager):        assert manager.inspect().ping()        g = (            ids.si(i=1) |            ids.si(i=2) |            group(ids.si(i=i) for i in range(2, 50))        )        res = g()        expected_root_id = res.parent.parent.id        expected_parent_id = res.parent.id        values = res.get(timeout=TIMEOUT)        for i, r in enumerate(values):            root_id, parent_id, value = r            assert root_id == expected_root_id            assert parent_id == expected_parent_id            assert value == i + 2def assert_ids(r, expected_value, expected_root_id, expected_parent_id):    root_id, parent_id, value = r.get(timeout=TIMEOUT)    assert expected_value == value    assert root_id == expected_root_id    assert parent_id == expected_parent_idclass test_chord:    @flaky    def test_group_chain(self, manager):        if not manager.app.conf.result_backend.startswith('redis'):            raise pytest.skip('Requires redis result backend.')        c = (            add.s(2, 2) |            group(add.s(i) for i in range(4)) |            add_to_all.s(8)        )        res = c()        assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]    @flaky    def test_parent_ids(self, manager):        if not manager.app.conf.result_backend.startswith('redis'):            raise pytest.skip('Requires redis result backend.')        root = ids.si(i=1)        expected_root_id = root.freeze().id        g = chain(            root, ids.si(i=2),            chord(                group(ids.si(i=i) for i in range(3, 50)),                chain(collect_ids.s(i=50) | ids.si(i=51)),            ),        )        self.assert_parentids_chord(g(), expected_root_id)    @flaky    def test_parent_ids__OR(self, manager):        if not manager.app.conf.result_backend.startswith('redis'):            raise pytest.skip('Requires redis result backend.')        root = ids.si(i=1)        expected_root_id = root.freeze().id        g = (            root |            ids.si(i=2) |            group(ids.si(i=i) for i in range(3, 50)) |            collect_ids.s(i=50) |            ids.si(i=51)        )        self.assert_parentids_chord(g(), expected_root_id)    def assert_parentids_chord(self, res, expected_root_id):        assert isinstance(res, AsyncResult)        assert isinstance(res.parent, AsyncResult)        assert isinstance(res.parent.parent, GroupResult)        assert isinstance(res.parent.parent.parent, AsyncResult)        assert isinstance(res.parent.parent.parent.parent, AsyncResult)        # first we check the last task        assert_ids(res, 51, expected_root_id, res.parent.id)        # then the chord callback        prev, (root_id, parent_id, value) = res.parent.get(timeout=30)        assert value == 50        assert root_id == expected_root_id        # started by one of the chord header tasks.        assert parent_id in res.parent.parent.results        # check what the chord callback recorded        for i, p in enumerate(prev):            root_id, parent_id, value = p            assert root_id == expected_root_id            assert parent_id == res.parent.parent.parent.id        # ids(i=2)        root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)        assert value == 2        assert parent_id == res.parent.parent.parent.parent.id        assert root_id == expected_root_id        # ids(i=1)        root_id, parent_id, value = res.parent.parent.parent.parent.get(            timeout=30)        assert value == 1        assert root_id == expected_root_id        assert parent_id is None
 |