123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- from __future__ import absolute_import, unicode_literals
- import pytest
- from redis import StrictRedis
- from celery import chain, chord, group
- from celery.exceptions import TimeoutError
- from celery.result import AsyncResult, GroupResult
- from .conftest import flaky
- from .tasks import add, add_replaced, add_to_all, collect_ids, ids, redis_echo
- TIMEOUT = 120
- class test_chain:
- @flaky
- def test_simple_chain(self, manager):
- c = add.s(4, 4) | add.s(8) | add.s(16)
- assert c().get(timeout=TIMEOUT) == 32
- @flaky
- def test_complex_chain(self, manager):
- c = (
- add.s(2, 2) | (
- add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
- ) |
- group(add.s(i) for i in range(4))
- )
- res = c()
- assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
- @flaky
- def test_group_chord_group_chain(self, manager):
- from celery.five import bytes_if_py2
- if not manager.app.conf.result_backend.startswith('redis'):
- raise pytest.skip('Requires redis result backend.')
- redis_connection = StrictRedis()
- redis_connection.delete('redis-echo')
- before = group(redis_echo.si('before {}'.format(i)) for i in range(3))
- connect = redis_echo.si('connect')
- after = group(redis_echo.si('after {}'.format(i)) for i in range(2))
- result = (before | connect | after).delay()
- result.get(timeout=TIMEOUT)
- redis_messages = list(map(
- bytes_if_py2,
- redis_connection.lrange('redis-echo', 0, -1)
- ))
- before_items = \
- set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2')))
- after_items = set(map(bytes_if_py2, (b'after 0', b'after 1')))
- assert set(redis_messages[:3]) == before_items
- assert redis_messages[3] == b'connect'
- assert set(redis_messages[4:]) == after_items
- redis_connection.delete('redis-echo')
- @flaky
- def test_parent_ids(self, manager, num=10):
- assert manager.inspect().ping()
- c = chain(ids.si(i=i) for i in range(num))
- c.freeze()
- res = c()
- try:
- res.get(timeout=TIMEOUT)
- except TimeoutError:
- print(manager.inspect.active())
- print(manager.inspect.reserved())
- print(manager.inspect.stats())
- raise
- self.assert_ids(res, num - 1)
- def assert_ids(self, res, size):
- i, root = size, res
- while root.parent:
- root = root.parent
- node = res
- while node:
- root_id, parent_id, value = node.get(timeout=30)
- assert value == i
- if node.parent:
- assert parent_id == node.parent.id
- assert root_id == root.id
- node = node.parent
- i -= 1
- class test_group:
- @flaky
- def test_parent_ids(self, manager):
- assert manager.inspect().ping()
- g = (
- ids.si(i=1) |
- ids.si(i=2) |
- group(ids.si(i=i) for i in range(2, 50))
- )
- res = g()
- expected_root_id = res.parent.parent.id
- expected_parent_id = res.parent.id
- values = res.get(timeout=TIMEOUT)
- for i, r in enumerate(values):
- root_id, parent_id, value = r
- assert root_id == expected_root_id
- assert parent_id == expected_parent_id
- assert value == i + 2
- def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
- root_id, parent_id, value = r.get(timeout=TIMEOUT)
- assert expected_value == value
- assert root_id == expected_root_id
- assert parent_id == expected_parent_id
- class test_chord:
- @flaky
- def test_group_chain(self, manager):
- if not manager.app.conf.result_backend.startswith('redis'):
- raise pytest.skip('Requires redis result backend.')
- c = (
- add.s(2, 2) |
- group(add.s(i) for i in range(4)) |
- add_to_all.s(8)
- )
- res = c()
- assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
- @flaky
- def test_parent_ids(self, manager):
- if not manager.app.conf.result_backend.startswith('redis'):
- raise pytest.skip('Requires redis result backend.')
- root = ids.si(i=1)
- expected_root_id = root.freeze().id
- g = chain(
- root, ids.si(i=2),
- chord(
- group(ids.si(i=i) for i in range(3, 50)),
- chain(collect_ids.s(i=50) | ids.si(i=51)),
- ),
- )
- self.assert_parentids_chord(g(), expected_root_id)
- @flaky
- def test_parent_ids__OR(self, manager):
- if not manager.app.conf.result_backend.startswith('redis'):
- raise pytest.skip('Requires redis result backend.')
- root = ids.si(i=1)
- expected_root_id = root.freeze().id
- g = (
- root |
- ids.si(i=2) |
- group(ids.si(i=i) for i in range(3, 50)) |
- collect_ids.s(i=50) |
- ids.si(i=51)
- )
- self.assert_parentids_chord(g(), expected_root_id)
- def assert_parentids_chord(self, res, expected_root_id):
- assert isinstance(res, AsyncResult)
- assert isinstance(res.parent, AsyncResult)
- assert isinstance(res.parent.parent, GroupResult)
- assert isinstance(res.parent.parent.parent, AsyncResult)
- assert isinstance(res.parent.parent.parent.parent, AsyncResult)
- # first we check the last task
- assert_ids(res, 51, expected_root_id, res.parent.id)
- # then the chord callback
- prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
- assert value == 50
- assert root_id == expected_root_id
- # started by one of the chord header tasks.
- assert parent_id in res.parent.parent.results
- # check what the chord callback recorded
- for i, p in enumerate(prev):
- root_id, parent_id, value = p
- assert root_id == expected_root_id
- assert parent_id == res.parent.parent.parent.id
- # ids(i=2)
- root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
- assert value == 2
- assert parent_id == res.parent.parent.parent.parent.id
- assert root_id == expected_root_id
- # ids(i=1)
- root_id, parent_id, value = res.parent.parent.parent.parent.get(
- timeout=30)
- assert value == 1
- assert root_id == expected_root_id
- assert parent_id is None
|