import pytest from celery import chain, chord, group from celery.exceptions import TimeoutError from celery.result import AsyncResult, GroupResult from .conftest import flaky from .tasks import add, collect_ids, ids TIMEOUT = 120 class test_chain: @flaky def test_simple_chain(self, manager): c = add.s(4, 4) | add.s(8) | add.s(16) assert c().get(timeout=TIMEOUT) == 32 @flaky def test_complex_chain(self, manager): c = ( add.s(2, 2) | ( add.s(4) | add.s(8) | add.s(16) ) | group(add.s(i) for i in range(4)) ) res = c() assert res.get(timeout=TIMEOUT) == [32, 33, 34, 35] @flaky def test_parent_ids(self, manager, num=10): assert manager.inspect().ping() c = chain(ids.si(i=i) for i in range(num)) c.freeze() res = c() try: res.get(timeout=TIMEOUT) except TimeoutError: print(manager.inspect.active()) print(manager.inspect.reserved()) print(manager.inspect.stats()) raise self.assert_ids(res, num - 1) def assert_ids(self, res, size): i, root = size, res while root.parent: root = root.parent node = res while node: root_id, parent_id, value = node.get(timeout=30) assert value == i if node.parent: assert parent_id == node.parent.id assert root_id == root.id node = node.parent i -= 1 class test_group: @flaky def test_parent_ids(self, manager): assert manager.inspect().ping() g = ( ids.si(i=1) | ids.si(i=2) | group(ids.si(i=i) for i in range(2, 50)) ) res = g() expected_root_id = res.parent.parent.id expected_parent_id = res.parent.id values = res.get(timeout=TIMEOUT) for i, r in enumerate(values): root_id, parent_id, value = r assert root_id == expected_root_id assert parent_id == expected_parent_id assert value == i + 2 def assert_ids(r, expected_value, expected_root_id, expected_parent_id): root_id, parent_id, value = r.get(timeout=TIMEOUT) assert expected_value == value assert root_id == expected_root_id assert parent_id == expected_parent_id class test_chord: @flaky def test_parent_ids(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') root = ids.si(i=1) expected_root_id = root.freeze().id g = chain( root, ids.si(i=2), chord( group(ids.si(i=i) for i in range(3, 50)), chain(collect_ids.s(i=50) | ids.si(i=51)), ), ) self.assert_parentids_chord(g(), expected_root_id) @flaky def test_parent_ids__OR(self, manager): if not manager.app.conf.result_backend.startswith('redis'): raise pytest.skip('Requires redis result backend.') root = ids.si(i=1) expected_root_id = root.freeze().id g = ( root | ids.si(i=2) | group(ids.si(i=i) for i in range(3, 50)) | collect_ids.s(i=50) | ids.si(i=51) ) self.assert_parentids_chord(g(), expected_root_id) def assert_parentids_chord(self, res, expected_root_id): assert isinstance(res, AsyncResult) assert isinstance(res.parent, AsyncResult) assert isinstance(res.parent.parent, GroupResult) assert isinstance(res.parent.parent.parent, AsyncResult) assert isinstance(res.parent.parent.parent.parent, AsyncResult) # first we check the last task assert_ids(res, 51, expected_root_id, res.parent.id) # then the chord callback prev, (root_id, parent_id, value) = res.parent.get(timeout=30) assert value == 50 assert root_id == expected_root_id # started by one of the chord header tasks. assert parent_id in res.parent.parent.results # check what the chord callback recorded for i, p in enumerate(prev): root_id, parent_id, value = p assert root_id == expected_root_id assert parent_id == res.parent.parent.parent.id # ids(i=2) root_id, parent_id, value = res.parent.parent.parent.get(timeout=30) assert value == 2 assert parent_id == res.parent.parent.parent.parent.id assert root_id == expected_root_id # ids(i=1) root_id, parent_id, value = res.parent.parent.parent.parent.get( timeout=30) assert value == 1 assert root_id == expected_root_id assert parent_id is None