test_canvas.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from redis import StrictRedis
  4. from celery import chain, chord, group
  5. from celery.exceptions import TimeoutError
  6. from celery.result import AsyncResult, GroupResult
  7. from .conftest import flaky
  8. from .tasks import (add, add_replaced, add_to_all, collect_ids, ids,
  9. redis_echo, second_order_replace1)
  10. TIMEOUT = 120
  11. class test_chain:
  12. @flaky
  13. def test_simple_chain(self, manager):
  14. c = add.s(4, 4) | add.s(8) | add.s(16)
  15. assert c().get(timeout=TIMEOUT) == 32
  16. @flaky
  17. def test_complex_chain(self, manager):
  18. c = (
  19. add.s(2, 2) | (
  20. add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
  21. ) |
  22. group(add.s(i) for i in range(4))
  23. )
  24. res = c()
  25. assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
  26. @flaky
  27. def test_group_chord_group_chain(self, manager):
  28. from celery.five import bytes_if_py2
  29. if not manager.app.conf.result_backend.startswith('redis'):
  30. raise pytest.skip('Requires redis result backend.')
  31. redis_connection = StrictRedis()
  32. redis_connection.delete('redis-echo')
  33. before = group(redis_echo.si('before {}'.format(i)) for i in range(3))
  34. connect = redis_echo.si('connect')
  35. after = group(redis_echo.si('after {}'.format(i)) for i in range(2))
  36. result = (before | connect | after).delay()
  37. result.get(timeout=TIMEOUT)
  38. redis_messages = list(map(
  39. bytes_if_py2,
  40. redis_connection.lrange('redis-echo', 0, -1)
  41. ))
  42. before_items = \
  43. set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2')))
  44. after_items = set(map(bytes_if_py2, (b'after 0', b'after 1')))
  45. assert set(redis_messages[:3]) == before_items
  46. assert redis_messages[3] == b'connect'
  47. assert set(redis_messages[4:]) == after_items
  48. redis_connection.delete('redis-echo')
  49. @flaky
  50. def test_second_order_replace(self, manager):
  51. from celery.five import bytes_if_py2
  52. if not manager.app.conf.result_backend.startswith('redis'):
  53. raise pytest.skip('Requires redis result backend.')
  54. redis_connection = StrictRedis()
  55. redis_connection.delete('redis-echo')
  56. result = second_order_replace1.delay()
  57. result.get(timeout=TIMEOUT)
  58. redis_messages = list(map(
  59. bytes_if_py2,
  60. redis_connection.lrange('redis-echo', 0, -1)
  61. ))
  62. expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A']
  63. assert redis_messages == expected_messages
  64. @flaky
  65. def test_parent_ids(self, manager, num=10):
  66. assert manager.inspect().ping()
  67. c = chain(ids.si(i=i) for i in range(num))
  68. c.freeze()
  69. res = c()
  70. try:
  71. res.get(timeout=TIMEOUT)
  72. except TimeoutError:
  73. print(manager.inspect.active())
  74. print(manager.inspect.reserved())
  75. print(manager.inspect.stats())
  76. raise
  77. self.assert_ids(res, num - 1)
  78. def assert_ids(self, res, size):
  79. i, root = size, res
  80. while root.parent:
  81. root = root.parent
  82. node = res
  83. while node:
  84. root_id, parent_id, value = node.get(timeout=30)
  85. assert value == i
  86. if node.parent:
  87. assert parent_id == node.parent.id
  88. assert root_id == root.id
  89. node = node.parent
  90. i -= 1
  91. class test_group:
  92. @flaky
  93. def test_empty_group_result(self, manager):
  94. if not manager.app.conf.result_backend.startswith('redis'):
  95. raise pytest.skip('Requires redis result backend.')
  96. task = group([])
  97. result = task.apply_async()
  98. GroupResult.save(result)
  99. task = GroupResult.restore(result.id)
  100. assert task.results == []
  101. @flaky
  102. def test_parent_ids(self, manager):
  103. assert manager.inspect().ping()
  104. g = (
  105. ids.si(i=1) |
  106. ids.si(i=2) |
  107. group(ids.si(i=i) for i in range(2, 50))
  108. )
  109. res = g()
  110. expected_root_id = res.parent.parent.id
  111. expected_parent_id = res.parent.id
  112. values = res.get(timeout=TIMEOUT)
  113. for i, r in enumerate(values):
  114. root_id, parent_id, value = r
  115. assert root_id == expected_root_id
  116. assert parent_id == expected_parent_id
  117. assert value == i + 2
  118. def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
  119. root_id, parent_id, value = r.get(timeout=TIMEOUT)
  120. assert expected_value == value
  121. assert root_id == expected_root_id
  122. assert parent_id == expected_parent_id
  123. class test_chord:
  124. @flaky
  125. def test_group_chain(self, manager):
  126. if not manager.app.conf.result_backend.startswith('redis'):
  127. raise pytest.skip('Requires redis result backend.')
  128. c = (
  129. add.s(2, 2) |
  130. group(add.s(i) for i in range(4)) |
  131. add_to_all.s(8)
  132. )
  133. res = c()
  134. assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
  135. @flaky
  136. def test_parent_ids(self, manager):
  137. if not manager.app.conf.result_backend.startswith('redis'):
  138. raise pytest.skip('Requires redis result backend.')
  139. root = ids.si(i=1)
  140. expected_root_id = root.freeze().id
  141. g = chain(
  142. root, ids.si(i=2),
  143. chord(
  144. group(ids.si(i=i) for i in range(3, 50)),
  145. chain(collect_ids.s(i=50) | ids.si(i=51)),
  146. ),
  147. )
  148. self.assert_parentids_chord(g(), expected_root_id)
  149. @flaky
  150. def test_parent_ids__OR(self, manager):
  151. if not manager.app.conf.result_backend.startswith('redis'):
  152. raise pytest.skip('Requires redis result backend.')
  153. root = ids.si(i=1)
  154. expected_root_id = root.freeze().id
  155. g = (
  156. root |
  157. ids.si(i=2) |
  158. group(ids.si(i=i) for i in range(3, 50)) |
  159. collect_ids.s(i=50) |
  160. ids.si(i=51)
  161. )
  162. self.assert_parentids_chord(g(), expected_root_id)
  163. def assert_parentids_chord(self, res, expected_root_id):
  164. assert isinstance(res, AsyncResult)
  165. assert isinstance(res.parent, AsyncResult)
  166. assert isinstance(res.parent.parent, GroupResult)
  167. assert isinstance(res.parent.parent.parent, AsyncResult)
  168. assert isinstance(res.parent.parent.parent.parent, AsyncResult)
  169. # first we check the last task
  170. assert_ids(res, 51, expected_root_id, res.parent.id)
  171. # then the chord callback
  172. prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
  173. assert value == 50
  174. assert root_id == expected_root_id
  175. # started by one of the chord header tasks.
  176. assert parent_id in res.parent.parent.results
  177. # check what the chord callback recorded
  178. for i, p in enumerate(prev):
  179. root_id, parent_id, value = p
  180. assert root_id == expected_root_id
  181. assert parent_id == res.parent.parent.parent.id
  182. # ids(i=2)
  183. root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
  184. assert value == 2
  185. assert parent_id == res.parent.parent.parent.parent.id
  186. assert root_id == expected_root_id
  187. # ids(i=1)
  188. root_id, parent_id, value = res.parent.parent.parent.parent.get(
  189. timeout=30)
  190. assert value == 1
  191. assert root_id == expected_root_id
  192. assert parent_id is None