test_canvas.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from redis import StrictRedis
  4. from celery import chain, chord, group
  5. from celery.exceptions import TimeoutError
  6. from celery.result import AsyncResult, GroupResult
  7. from .conftest import flaky
  8. from .tasks import add, add_replaced, add_to_all, collect_ids, ids, redis_echo
  9. TIMEOUT = 120
  10. class test_chain:
  11. @flaky
  12. def test_simple_chain(self, manager):
  13. c = add.s(4, 4) | add.s(8) | add.s(16)
  14. assert c().get(timeout=TIMEOUT) == 32
  15. @flaky
  16. def test_complex_chain(self, manager):
  17. c = (
  18. add.s(2, 2) | (
  19. add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
  20. ) |
  21. group(add.s(i) for i in range(4))
  22. )
  23. res = c()
  24. assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
  25. @flaky
  26. def test_group_chord_group_chain(self, manager):
  27. from celery.five import bytes_if_py2
  28. if not manager.app.conf.result_backend.startswith('redis'):
  29. raise pytest.skip('Requires redis result backend.')
  30. redis_connection = StrictRedis()
  31. redis_connection.delete('redis-echo')
  32. before = group(redis_echo.si('before {}'.format(i)) for i in range(3))
  33. connect = redis_echo.si('connect')
  34. after = group(redis_echo.si('after {}'.format(i)) for i in range(2))
  35. result = (before | connect | after).delay()
  36. result.get(timeout=TIMEOUT)
  37. redis_messages = list(map(
  38. bytes_if_py2,
  39. redis_connection.lrange('redis-echo', 0, -1)
  40. ))
  41. before_items = \
  42. set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2')))
  43. after_items = set(map(bytes_if_py2, (b'after 0', b'after 1')))
  44. assert set(redis_messages[:3]) == before_items
  45. assert redis_messages[3] == b'connect'
  46. assert set(redis_messages[4:]) == after_items
  47. redis_connection.delete('redis-echo')
  48. @flaky
  49. def test_parent_ids(self, manager, num=10):
  50. assert manager.inspect().ping()
  51. c = chain(ids.si(i=i) for i in range(num))
  52. c.freeze()
  53. res = c()
  54. try:
  55. res.get(timeout=TIMEOUT)
  56. except TimeoutError:
  57. print(manager.inspect.active())
  58. print(manager.inspect.reserved())
  59. print(manager.inspect.stats())
  60. raise
  61. self.assert_ids(res, num - 1)
  62. def assert_ids(self, res, size):
  63. i, root = size, res
  64. while root.parent:
  65. root = root.parent
  66. node = res
  67. while node:
  68. root_id, parent_id, value = node.get(timeout=30)
  69. assert value == i
  70. if node.parent:
  71. assert parent_id == node.parent.id
  72. assert root_id == root.id
  73. node = node.parent
  74. i -= 1
  75. class test_group:
  76. @flaky
  77. def test_parent_ids(self, manager):
  78. assert manager.inspect().ping()
  79. g = (
  80. ids.si(i=1) |
  81. ids.si(i=2) |
  82. group(ids.si(i=i) for i in range(2, 50))
  83. )
  84. res = g()
  85. expected_root_id = res.parent.parent.id
  86. expected_parent_id = res.parent.id
  87. values = res.get(timeout=TIMEOUT)
  88. for i, r in enumerate(values):
  89. root_id, parent_id, value = r
  90. assert root_id == expected_root_id
  91. assert parent_id == expected_parent_id
  92. assert value == i + 2
  93. def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
  94. root_id, parent_id, value = r.get(timeout=TIMEOUT)
  95. assert expected_value == value
  96. assert root_id == expected_root_id
  97. assert parent_id == expected_parent_id
  98. class test_chord:
  99. @flaky
  100. def test_group_chain(self, manager):
  101. if not manager.app.conf.result_backend.startswith('redis'):
  102. raise pytest.skip('Requires redis result backend.')
  103. c = (
  104. add.s(2, 2) |
  105. group(add.s(i) for i in range(4)) |
  106. add_to_all.s(8)
  107. )
  108. res = c()
  109. assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
  110. @flaky
  111. def test_parent_ids(self, manager):
  112. if not manager.app.conf.result_backend.startswith('redis'):
  113. raise pytest.skip('Requires redis result backend.')
  114. root = ids.si(i=1)
  115. expected_root_id = root.freeze().id
  116. g = chain(
  117. root, ids.si(i=2),
  118. chord(
  119. group(ids.si(i=i) for i in range(3, 50)),
  120. chain(collect_ids.s(i=50) | ids.si(i=51)),
  121. ),
  122. )
  123. self.assert_parentids_chord(g(), expected_root_id)
  124. @flaky
  125. def test_parent_ids__OR(self, manager):
  126. if not manager.app.conf.result_backend.startswith('redis'):
  127. raise pytest.skip('Requires redis result backend.')
  128. root = ids.si(i=1)
  129. expected_root_id = root.freeze().id
  130. g = (
  131. root |
  132. ids.si(i=2) |
  133. group(ids.si(i=i) for i in range(3, 50)) |
  134. collect_ids.s(i=50) |
  135. ids.si(i=51)
  136. )
  137. self.assert_parentids_chord(g(), expected_root_id)
  138. def assert_parentids_chord(self, res, expected_root_id):
  139. assert isinstance(res, AsyncResult)
  140. assert isinstance(res.parent, AsyncResult)
  141. assert isinstance(res.parent.parent, GroupResult)
  142. assert isinstance(res.parent.parent.parent, AsyncResult)
  143. assert isinstance(res.parent.parent.parent.parent, AsyncResult)
  144. # first we check the last task
  145. assert_ids(res, 51, expected_root_id, res.parent.id)
  146. # then the chord callback
  147. prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
  148. assert value == 50
  149. assert root_id == expected_root_id
  150. # started by one of the chord header tasks.
  151. assert parent_id in res.parent.parent.results
  152. # check what the chord callback recorded
  153. for i, p in enumerate(prev):
  154. root_id, parent_id, value = p
  155. assert root_id == expected_root_id
  156. assert parent_id == res.parent.parent.parent.id
  157. # ids(i=2)
  158. root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
  159. assert value == 2
  160. assert parent_id == res.parent.parent.parent.parent.id
  161. assert root_id == expected_root_id
  162. # ids(i=1)
  163. root_id, parent_id, value = res.parent.parent.parent.parent.get(
  164. timeout=30)
  165. assert value == 1
  166. assert root_id == expected_root_id
  167. assert parent_id is None