| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 | from __future__ import absolute_import, unicode_literalsimport pytestfrom celery import chain, group, uuidfrom cyanide.tasks import add, collect_ids, idsclass test_chain:    def test_simple_chain(self, manager):        c = add.s(4, 4) | add.s(8) | add.s(16)        assert manager.join(c()) == 32    def test_complex_chain(self, manager):        c = (            add.s(2, 2) | (                add.s(4) | add.s(8) | add.s(16)            ) |            group(add.s(i) for i in range(4))        )        res = c()        assert res.get() == [32, 33, 34, 35]    def test_parent_ids(self, manager, num=10):        c = chain(ids.si(i) for i in range(num))        c.freeze()        res = c()        res.get(timeout=5)        self.assert_ids(res, num - 1)    def assert_ids(self, res, size):        i, root = size, res        while root.parent:            root = root.parent        node = res        while node:            root_id, parent_id, value = node.get(timeout=5)            assert value == i            assert root_id == root.id            if node.parent:                assert parent_id == node.parent.id            node = node.parent            i -= 1class test_group:    def test_parent_ids(self):        g = ids.si(1) | ids.si(2) | group(ids.si(i) for i in range(2, 50))        res = g()        expected_root_id = res.parent.parent.id        expected_parent_id = res.parent.id        values = res.get(timeout=5)        for i, r in enumerate(values):            root_id, parent_id, value = r            assert root_id == expected_root_id            assert parent_id == expected_parent_id            assert value == i + 2class xxx_chord:    @pytest.mark.celery(redis_results=1)    def test_parent_ids(self, manager):        self.assert_parentids_chord()        self.assert_parentids_chord(uuid(), uuid())    def assert_parentids_chord(self, base_root=None, base_parent=None):        g = (            ids.si(1) |            ids.si(2) |            group(ids.si(i) for i in range(3, 50)) |            collect_ids.s(i=50) |            ids.si(51)        )        g.freeze(root_id=base_root, parent_id=base_parent)        res = g.apply_async(root_id=base_root, parent_id=base_parent)        expected_root_id = base_root or res.parent.parent.parent.id        root_id, parent_id, value = res.get(timeout=5)        assert value == 51        assert root_id == expected_root_id        assert parent_id == res.parent.id        prev, (root_id, parent_id, value) = res.parent.get(timeout=5)        assert value == 50        assert root_id == expected_root_id        assert parent_id == res.parent.parent.id        for i, p in enumerate(prev):            root_id, parent_id, value = p            assert root_id == expected_root_id            assert parent_id == res.parent.parent.id        root_id, parent_id, value = res.parent.parent.get(timeout=5)        assert value == 2        assert parent_id == res.parent.parent.parent.id        assert root_id == expected_root_id        root_id, parent_id, value = res.parent.parent.parent.get(timeout=5)        assert value == 1        assert root_id == expected_root_id        assert parent_id == base_parent
 |