|
@@ -1,7 +1,8 @@
|
|
|
from __future__ import absolute_import, unicode_literals
|
|
|
import pytest
|
|
|
-from celery import chain, group, uuid
|
|
|
+from celery import chain, chord, group
|
|
|
from celery.exceptions import TimeoutError
|
|
|
+from celery.result import AsyncResult, GroupResult
|
|
|
from .tasks import add, collect_ids, ids
|
|
|
|
|
|
TIMEOUT = 120
|
|
@@ -25,7 +26,7 @@ class test_chain:
|
|
|
|
|
|
def test_parent_ids(self, manager, num=10):
|
|
|
assert manager.inspect().ping()
|
|
|
- c = chain(ids.si(i) for i in range(num))
|
|
|
+ c = chain(ids.si(i=i) for i in range(num))
|
|
|
c.freeze()
|
|
|
res = c()
|
|
|
try:
|
|
@@ -45,9 +46,9 @@ class test_chain:
|
|
|
while node:
|
|
|
root_id, parent_id, value = node.get(timeout=30)
|
|
|
assert value == i
|
|
|
- assert root_id == root.id
|
|
|
if node.parent:
|
|
|
assert parent_id == node.parent.id
|
|
|
+ assert root_id == root.id
|
|
|
node = node.parent
|
|
|
i -= 1
|
|
|
|
|
@@ -56,7 +57,11 @@ class test_group:
|
|
|
|
|
|
def test_parent_ids(self, manager):
|
|
|
assert manager.inspect().ping()
|
|
|
- g = ids.si(1) | ids.si(2) | group(ids.si(i) for i in range(2, 50))
|
|
|
+ g = (
|
|
|
+ ids.si(i=1) |
|
|
|
+ ids.si(i=2) |
|
|
|
+ group(ids.si(i=i) for i in range(2, 50))
|
|
|
+ )
|
|
|
res = g()
|
|
|
expected_root_id = res.parent.parent.id
|
|
|
expected_parent_id = res.parent.id
|
|
@@ -69,48 +74,72 @@ class test_group:
|
|
|
assert value == i + 2
|
|
|
|
|
|
|
|
|
+def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
|
|
|
+ root_id, parent_id, value = r.get(timeout=TIMEOUT)
|
|
|
+ assert expected_value == value
|
|
|
+ assert root_id == expected_root_id
|
|
|
+ assert parent_id == expected_parent_id
|
|
|
+
|
|
|
+
|
|
|
@pytest.mark.celery(result_backend='redis://')
|
|
|
-class xxx_chord:
|
|
|
+class test_chord:
|
|
|
|
|
|
def test_parent_ids(self, manager):
|
|
|
- self.assert_parentids_chord()
|
|
|
-
|
|
|
- def test_parent_ids__already_set(self, manager):
|
|
|
- self.assert_parentids_chord(uuid(), uuid())
|
|
|
+ root = ids.si(i=1)
|
|
|
+ expected_root_id = root.freeze().id
|
|
|
+ g = chain(
|
|
|
+ root, ids.si(i=2),
|
|
|
+ chord(
|
|
|
+ group(ids.si(i=i) for i in range(3, 50)),
|
|
|
+ chain(collect_ids.s(i=50) | ids.si(i=51)),
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ self.assert_parentids_chord(g(), expected_root_id)
|
|
|
|
|
|
- def assert_parentids_chord(self, base_root=None, base_parent=None):
|
|
|
+ def test_parent_ids__OR(self, manager):
|
|
|
+ root = ids.si(i=1)
|
|
|
+ expected_root_id = root.freeze().id
|
|
|
g = (
|
|
|
- ids.si(1) |
|
|
|
- ids.si(2) |
|
|
|
- group(ids.si(i) for i in range(3, 50)) |
|
|
|
+ root |
|
|
|
+ ids.si(i=2) |
|
|
|
+ group(ids.si(i=i) for i in range(3, 50)) |
|
|
|
collect_ids.s(i=50) |
|
|
|
- ids.si(51)
|
|
|
+ ids.si(i=51)
|
|
|
)
|
|
|
- g.freeze(root_id=base_root, parent_id=base_parent)
|
|
|
- res = g.apply_async(root_id=base_root, parent_id=base_parent)
|
|
|
- expected_root_id = base_root or res.parent.parent.parent.id
|
|
|
+ self.assert_parentids_chord(g(), expected_root_id)
|
|
|
|
|
|
- root_id, parent_id, value = res.get(timeout=30)
|
|
|
- assert value == 51
|
|
|
- assert root_id == expected_root_id
|
|
|
- assert parent_id == res.parent.id
|
|
|
+ def assert_parentids_chord(self, res, expected_root_id):
|
|
|
+ assert isinstance(res, AsyncResult)
|
|
|
+ assert isinstance(res.parent, AsyncResult)
|
|
|
+ assert isinstance(res.parent.parent, GroupResult)
|
|
|
+ assert isinstance(res.parent.parent.parent, AsyncResult)
|
|
|
+ assert isinstance(res.parent.parent.parent.parent, AsyncResult)
|
|
|
|
|
|
+ # first we check the last task
|
|
|
+ assert_ids(res, 51, expected_root_id, res.parent.id)
|
|
|
+
|
|
|
+ # then the chord callback
|
|
|
prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
|
|
|
assert value == 50
|
|
|
assert root_id == expected_root_id
|
|
|
- assert parent_id == res.parent.parent.id
|
|
|
+ # started by one of the chord header tasks.
|
|
|
+ assert parent_id in res.parent.parent.results
|
|
|
|
|
|
+ # check what the chord callback recorded
|
|
|
for i, p in enumerate(prev):
|
|
|
root_id, parent_id, value = p
|
|
|
assert root_id == expected_root_id
|
|
|
- assert parent_id == res.parent.parent.id
|
|
|
+ assert parent_id == res.parent.parent.parent.id
|
|
|
|
|
|
- root_id, parent_id, value = res.parent.parent.get(timeout=30)
|
|
|
+ # ids(i=2)
|
|
|
+ root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
|
|
|
assert value == 2
|
|
|
- assert parent_id == res.parent.parent.parent.id
|
|
|
+ assert parent_id == res.parent.parent.parent.parent.id
|
|
|
assert root_id == expected_root_id
|
|
|
|
|
|
- root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
|
|
|
+ # ids(i=1)
|
|
|
+ root_id, parent_id, value = res.parent.parent.parent.parent.get(
|
|
|
+ timeout=30)
|
|
|
assert value == 1
|
|
|
assert root_id == expected_root_id
|
|
|
- assert parent_id == base_parent
|
|
|
+ assert parent_id is None
|