test_canvas.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. from __future__ import absolute_import, unicode_literals
  2. from time import sleep
  3. import pytest
  4. from redis import StrictRedis
  5. from celery import chain, chord, group
  6. from celery.exceptions import TimeoutError
  7. from celery.result import AsyncResult, GroupResult
  8. from .conftest import flaky
  9. from .tasks import (add, add_replaced, add_to_all, collect_ids, delayed_sum,
  10. ids, redis_echo, second_order_replace1)
  11. TIMEOUT = 120
  12. class test_chain:
  13. @flaky
  14. def test_simple_chain(self, manager):
  15. c = add.s(4, 4) | add.s(8) | add.s(16)
  16. assert c().get(timeout=TIMEOUT) == 32
  17. @flaky
  18. def test_complex_chain(self, manager):
  19. c = (
  20. add.s(2, 2) | (
  21. add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
  22. ) |
  23. group(add.s(i) for i in range(4))
  24. )
  25. res = c()
  26. assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
  27. @flaky
  28. def test_group_chord_group_chain(self, manager):
  29. from celery.five import bytes_if_py2
  30. if not manager.app.conf.result_backend.startswith('redis'):
  31. raise pytest.skip('Requires redis result backend.')
  32. redis_connection = StrictRedis()
  33. redis_connection.delete('redis-echo')
  34. before = group(redis_echo.si('before {}'.format(i)) for i in range(3))
  35. connect = redis_echo.si('connect')
  36. after = group(redis_echo.si('after {}'.format(i)) for i in range(2))
  37. result = (before | connect | after).delay()
  38. result.get(timeout=TIMEOUT)
  39. redis_messages = list(map(
  40. bytes_if_py2,
  41. redis_connection.lrange('redis-echo', 0, -1)
  42. ))
  43. before_items = \
  44. set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2')))
  45. after_items = set(map(bytes_if_py2, (b'after 0', b'after 1')))
  46. assert set(redis_messages[:3]) == before_items
  47. assert redis_messages[3] == b'connect'
  48. assert set(redis_messages[4:]) == after_items
  49. redis_connection.delete('redis-echo')
  50. @flaky
  51. def test_second_order_replace(self, manager):
  52. from celery.five import bytes_if_py2
  53. if not manager.app.conf.result_backend.startswith('redis'):
  54. raise pytest.skip('Requires redis result backend.')
  55. redis_connection = StrictRedis()
  56. redis_connection.delete('redis-echo')
  57. result = second_order_replace1.delay()
  58. result.get(timeout=TIMEOUT)
  59. redis_messages = list(map(
  60. bytes_if_py2,
  61. redis_connection.lrange('redis-echo', 0, -1)
  62. ))
  63. expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A']
  64. assert redis_messages == expected_messages
  65. @flaky
  66. def test_parent_ids(self, manager, num=10):
  67. assert manager.inspect().ping()
  68. c = chain(ids.si(i=i) for i in range(num))
  69. c.freeze()
  70. res = c()
  71. try:
  72. res.get(timeout=TIMEOUT)
  73. except TimeoutError:
  74. print(manager.inspect.active())
  75. print(manager.inspect.reserved())
  76. print(manager.inspect.stats())
  77. raise
  78. self.assert_ids(res, num - 1)
  79. def assert_ids(self, res, size):
  80. i, root = size, res
  81. while root.parent:
  82. root = root.parent
  83. node = res
  84. while node:
  85. root_id, parent_id, value = node.get(timeout=30)
  86. assert value == i
  87. if node.parent:
  88. assert parent_id == node.parent.id
  89. assert root_id == root.id
  90. node = node.parent
  91. i -= 1
  92. class test_group:
  93. @flaky
  94. def test_empty_group_result(self, manager):
  95. if not manager.app.conf.result_backend.startswith('redis'):
  96. raise pytest.skip('Requires redis result backend.')
  97. task = group([])
  98. result = task.apply_async()
  99. GroupResult.save(result)
  100. task = GroupResult.restore(result.id)
  101. assert task.results == []
  102. @flaky
  103. def test_parent_ids(self, manager):
  104. assert manager.inspect().ping()
  105. g = (
  106. ids.si(i=1) |
  107. ids.si(i=2) |
  108. group(ids.si(i=i) for i in range(2, 50))
  109. )
  110. res = g()
  111. expected_root_id = res.parent.parent.id
  112. expected_parent_id = res.parent.id
  113. values = res.get(timeout=TIMEOUT)
  114. for i, r in enumerate(values):
  115. root_id, parent_id, value = r
  116. assert root_id == expected_root_id
  117. assert parent_id == expected_parent_id
  118. assert value == i + 2
  119. @flaky
  120. def test_nested_group(self, manager):
  121. assert manager.inspect().ping()
  122. c = group(
  123. add.si(1, 10),
  124. group(
  125. add.si(1, 100),
  126. group(
  127. add.si(1, 1000),
  128. add.si(1, 2000),
  129. ),
  130. ),
  131. )
  132. res = c()
  133. assert res.get(timeout=TIMEOUT) == [11, 101, 1001, 2001]
  134. def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
  135. root_id, parent_id, value = r.get(timeout=TIMEOUT)
  136. assert expected_value == value
  137. assert root_id == expected_root_id
  138. assert parent_id == expected_parent_id
  139. class test_chord:
  140. @flaky
  141. def test_redis_subscribed_channels_leak(self, manager):
  142. if not manager.app.conf.result_backend.startswith('redis'):
  143. raise pytest.skip('Requires redis result backend.')
  144. redis_client = StrictRedis()
  145. async_result = chord([add.s(5, 6), add.s(6, 7)])(delayed_sum.s())
  146. for _ in range(TIMEOUT):
  147. if async_result.state == 'STARTED':
  148. break
  149. sleep(0.2)
  150. channels_before = \
  151. len(redis_client.execute_command('PUBSUB CHANNELS'))
  152. assert async_result.get(timeout=TIMEOUT) == 24
  153. channels_after = \
  154. len(redis_client.execute_command('PUBSUB CHANNELS'))
  155. assert channels_after < channels_before
  156. @flaky
  157. def test_group_chain(self, manager):
  158. if not manager.app.conf.result_backend.startswith('redis'):
  159. raise pytest.skip('Requires redis result backend.')
  160. c = (
  161. add.s(2, 2) |
  162. group(add.s(i) for i in range(4)) |
  163. add_to_all.s(8)
  164. )
  165. res = c()
  166. assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
  167. @flaky
  168. def test_nested_group_chain(self, manager):
  169. try:
  170. manager.app.backend.ensure_chords_allowed()
  171. except NotImplementedError as e:
  172. raise pytest.skip(e.args[0])
  173. if not manager.app.backend.supports_native_join:
  174. raise pytest.skip('Requires native join support.')
  175. c = chain(
  176. add.si(1, 0),
  177. group(
  178. add.si(1, 100),
  179. chain(
  180. add.si(1, 200),
  181. group(
  182. add.si(1, 1000),
  183. add.si(1, 2000),
  184. ),
  185. ),
  186. ),
  187. add.si(1, 10),
  188. )
  189. res = c()
  190. assert res.get(timeout=TIMEOUT) == 11
  191. @flaky
  192. def test_single_task_header(self, manager):
  193. try:
  194. manager.app.backend.ensure_chords_allowed()
  195. except NotImplementedError as e:
  196. raise pytest.skip(e.args[0])
  197. c1 = chord([add.s(2, 5)], body=add_to_all.s(9))
  198. res1 = c1()
  199. assert res1.get(timeout=TIMEOUT) == [16]
  200. c2 = group([add.s(2, 5)]) | add_to_all.s(9)
  201. res2 = c2()
  202. assert res2.get(timeout=TIMEOUT) == [16]
  203. def test_empty_header_chord(self, manager):
  204. try:
  205. manager.app.backend.ensure_chords_allowed()
  206. except NotImplementedError as e:
  207. raise pytest.skip(e.args[0])
  208. c1 = chord([], body=add_to_all.s(9))
  209. res1 = c1()
  210. assert res1.get(timeout=TIMEOUT) == []
  211. c2 = group([]) | add_to_all.s(9)
  212. res2 = c2()
  213. assert res2.get(timeout=TIMEOUT) == []
  214. @flaky
  215. def test_nested_chord(self, manager):
  216. try:
  217. manager.app.backend.ensure_chords_allowed()
  218. except NotImplementedError as e:
  219. raise pytest.skip(e.args[0])
  220. c1 = chord([
  221. chord([add.s(1, 2), add.s(3, 4)], add.s([5])),
  222. chord([add.s(6, 7)], add.s([10]))
  223. ], add_to_all.s(['A']))
  224. res1 = c1()
  225. assert res1.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
  226. c2 = group([
  227. group([add.s(1, 2), add.s(3, 4)]) | add.s([5]),
  228. group([add.s(6, 7)]) | add.s([10]),
  229. ]) | add_to_all.s(['A'])
  230. res2 = c2()
  231. assert res2.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
  232. c = group([
  233. group([
  234. group([
  235. group([
  236. add.s(1, 2)
  237. ]) | add.s([3])
  238. ]) | add.s([4])
  239. ]) | add.s([5])
  240. ]) | add.s([6])
  241. res = c()
  242. assert [[[[3, 3], 4], 5], 6] == res.get(timeout=TIMEOUT)
  243. @flaky
  244. def test_parent_ids(self, manager):
  245. if not manager.app.conf.result_backend.startswith('redis'):
  246. raise pytest.skip('Requires redis result backend.')
  247. root = ids.si(i=1)
  248. expected_root_id = root.freeze().id
  249. g = chain(
  250. root, ids.si(i=2),
  251. chord(
  252. group(ids.si(i=i) for i in range(3, 50)),
  253. chain(collect_ids.s(i=50) | ids.si(i=51)),
  254. ),
  255. )
  256. self.assert_parentids_chord(g(), expected_root_id)
  257. @flaky
  258. def test_parent_ids__OR(self, manager):
  259. if not manager.app.conf.result_backend.startswith('redis'):
  260. raise pytest.skip('Requires redis result backend.')
  261. root = ids.si(i=1)
  262. expected_root_id = root.freeze().id
  263. g = (
  264. root |
  265. ids.si(i=2) |
  266. group(ids.si(i=i) for i in range(3, 50)) |
  267. collect_ids.s(i=50) |
  268. ids.si(i=51)
  269. )
  270. self.assert_parentids_chord(g(), expected_root_id)
  271. def assert_parentids_chord(self, res, expected_root_id):
  272. assert isinstance(res, AsyncResult)
  273. assert isinstance(res.parent, AsyncResult)
  274. assert isinstance(res.parent.parent, GroupResult)
  275. assert isinstance(res.parent.parent.parent, AsyncResult)
  276. assert isinstance(res.parent.parent.parent.parent, AsyncResult)
  277. # first we check the last task
  278. assert_ids(res, 51, expected_root_id, res.parent.id)
  279. # then the chord callback
  280. prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
  281. assert value == 50
  282. assert root_id == expected_root_id
  283. # started by one of the chord header tasks.
  284. assert parent_id in res.parent.parent.results
  285. # check what the chord callback recorded
  286. for i, p in enumerate(prev):
  287. root_id, parent_id, value = p
  288. assert root_id == expected_root_id
  289. assert parent_id == res.parent.parent.parent.id
  290. # ids(i=2)
  291. root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
  292. assert value == 2
  293. assert parent_id == res.parent.parent.parent.parent.id
  294. assert root_id == expected_root_id
  295. # ids(i=1)
  296. root_id, parent_id, value = res.parent.parent.parent.parent.get(
  297. timeout=30)
  298. assert value == 1
  299. assert root_id == expected_root_id
  300. assert parent_id is None