| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pytest
 
- from celery import chain, chord, group
 
- from celery.exceptions import TimeoutError
 
- from celery.result import AsyncResult, GroupResult
 
- from .conftest import flaky
 
- from .tasks import add, collect_ids, ids
 
- TIMEOUT = 120
 
- class test_chain:
 
-     @flaky
 
-     def test_simple_chain(self, manager):
 
-         c = add.s(4, 4) | add.s(8) | add.s(16)
 
-         assert c().get(timeout=TIMEOUT) == 32
 
-     @flaky
 
-     def test_complex_chain(self, manager):
 
-         c = (
 
-             add.s(2, 2) | (
 
-                 add.s(4) | add.s(8) | add.s(16)
 
-             ) |
 
-             group(add.s(i) for i in range(4))
 
-         )
 
-         res = c()
 
-         assert res.get(timeout=TIMEOUT) == [32, 33, 34, 35]
 
-     @flaky
 
-     def test_parent_ids(self, manager, num=10):
 
-         assert manager.inspect().ping()
 
-         c = chain(ids.si(i=i) for i in range(num))
 
-         c.freeze()
 
-         res = c()
 
-         try:
 
-             res.get(timeout=TIMEOUT)
 
-         except TimeoutError:
 
-             print(manager.inspect.active())
 
-             print(manager.inspect.reserved())
 
-             print(manager.inspect.stats())
 
-             raise
 
-         self.assert_ids(res, num - 1)
 
-     def assert_ids(self, res, size):
 
-         i, root = size, res
 
-         while root.parent:
 
-             root = root.parent
 
-         node = res
 
-         while node:
 
-             root_id, parent_id, value = node.get(timeout=30)
 
-             assert value == i
 
-             if node.parent:
 
-                 assert parent_id == node.parent.id
 
-             assert root_id == root.id
 
-             node = node.parent
 
-             i -= 1
 
- class test_group:
 
-     @flaky
 
-     def test_parent_ids(self, manager):
 
-         assert manager.inspect().ping()
 
-         g = (
 
-             ids.si(i=1) |
 
-             ids.si(i=2) |
 
-             group(ids.si(i=i) for i in range(2, 50))
 
-         )
 
-         res = g()
 
-         expected_root_id = res.parent.parent.id
 
-         expected_parent_id = res.parent.id
 
-         values = res.get(timeout=TIMEOUT)
 
-         for i, r in enumerate(values):
 
-             root_id, parent_id, value = r
 
-             assert root_id == expected_root_id
 
-             assert parent_id == expected_parent_id
 
-             assert value == i + 2
 
- def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
 
-     root_id, parent_id, value = r.get(timeout=TIMEOUT)
 
-     assert expected_value == value
 
-     assert root_id == expected_root_id
 
-     assert parent_id == expected_parent_id
 
- class test_chord:
 
-     @flaky
 
-     def test_parent_ids(self, manager):
 
-         if not manager.app.conf.result_backend.startswith('redis'):
 
-             raise pytest.skip('Requires redis result backend.')
 
-         root = ids.si(i=1)
 
-         expected_root_id = root.freeze().id
 
-         g = chain(
 
-             root, ids.si(i=2),
 
-             chord(
 
-                 group(ids.si(i=i) for i in range(3, 50)),
 
-                 chain(collect_ids.s(i=50) | ids.si(i=51)),
 
-             ),
 
-         )
 
-         self.assert_parentids_chord(g(), expected_root_id)
 
-     @flaky
 
-     def test_parent_ids__OR(self, manager):
 
-         if not manager.app.conf.result_backend.startswith('redis'):
 
-             raise pytest.skip('Requires redis result backend.')
 
-         root = ids.si(i=1)
 
-         expected_root_id = root.freeze().id
 
-         g = (
 
-             root |
 
-             ids.si(i=2) |
 
-             group(ids.si(i=i) for i in range(3, 50)) |
 
-             collect_ids.s(i=50) |
 
-             ids.si(i=51)
 
-         )
 
-         self.assert_parentids_chord(g(), expected_root_id)
 
-     def assert_parentids_chord(self, res, expected_root_id):
 
-         assert isinstance(res, AsyncResult)
 
-         assert isinstance(res.parent, AsyncResult)
 
-         assert isinstance(res.parent.parent, GroupResult)
 
-         assert isinstance(res.parent.parent.parent, AsyncResult)
 
-         assert isinstance(res.parent.parent.parent.parent, AsyncResult)
 
-         # first we check the last task
 
-         assert_ids(res, 51, expected_root_id, res.parent.id)
 
-         # then the chord callback
 
-         prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
 
-         assert value == 50
 
-         assert root_id == expected_root_id
 
-         # started by one of the chord header tasks.
 
-         assert parent_id in res.parent.parent.results
 
-         # check what the chord callback recorded
 
-         for i, p in enumerate(prev):
 
-             root_id, parent_id, value = p
 
-             assert root_id == expected_root_id
 
-             assert parent_id == res.parent.parent.parent.id
 
-         # ids(i=2)
 
-         root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
 
-         assert value == 2
 
-         assert parent_id == res.parent.parent.parent.parent.id
 
-         assert root_id == expected_root_id
 
-         # ids(i=1)
 
-         root_id, parent_id, value = res.parent.parent.parent.parent.get(
 
-             timeout=30)
 
-         assert value == 1
 
-         assert root_id == expected_root_id
 
-         assert parent_id is None
 
 
  |