test_canvas.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. from __future__ import absolute_import, unicode_literals
  2. from time import sleep
  3. import pytest
  4. from redis import StrictRedis
  5. from celery import chain, chord, group
  6. from celery.exceptions import TimeoutError
  7. from celery.result import AsyncResult, GroupResult
  8. from .conftest import flaky
  9. from .tasks import (add, add_chord_to_chord, add_replaced, add_to_all,
  10. add_to_all_to_chord, collect_ids, delayed_sum,
  11. delayed_sum_with_soft_guard, identity, ids, redis_echo,
  12. second_order_replace1, tsum)
  13. TIMEOUT = 120
  14. class test_chain:
  15. @flaky
  16. def test_simple_chain(self, manager):
  17. c = add.s(4, 4) | add.s(8) | add.s(16)
  18. assert c().get(timeout=TIMEOUT) == 32
  19. @flaky
  20. def test_complex_chain(self, manager):
  21. c = (
  22. add.s(2, 2) | (
  23. add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
  24. ) |
  25. group(add.s(i) for i in range(4))
  26. )
  27. res = c()
  28. assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
  29. @flaky
  30. def test_group_chord_group_chain(self, manager):
  31. from celery.five import bytes_if_py2
  32. if not manager.app.conf.result_backend.startswith('redis'):
  33. raise pytest.skip('Requires redis result backend.')
  34. redis_connection = StrictRedis()
  35. redis_connection.delete('redis-echo')
  36. before = group(redis_echo.si('before {}'.format(i)) for i in range(3))
  37. connect = redis_echo.si('connect')
  38. after = group(redis_echo.si('after {}'.format(i)) for i in range(2))
  39. result = (before | connect | after).delay()
  40. result.get(timeout=TIMEOUT)
  41. redis_messages = list(map(
  42. bytes_if_py2,
  43. redis_connection.lrange('redis-echo', 0, -1)
  44. ))
  45. before_items = \
  46. set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2')))
  47. after_items = set(map(bytes_if_py2, (b'after 0', b'after 1')))
  48. assert set(redis_messages[:3]) == before_items
  49. assert redis_messages[3] == b'connect'
  50. assert set(redis_messages[4:]) == after_items
  51. redis_connection.delete('redis-echo')
  52. @flaky
  53. def test_second_order_replace(self, manager):
  54. from celery.five import bytes_if_py2
  55. if not manager.app.conf.result_backend.startswith('redis'):
  56. raise pytest.skip('Requires redis result backend.')
  57. redis_connection = StrictRedis()
  58. redis_connection.delete('redis-echo')
  59. result = second_order_replace1.delay()
  60. result.get(timeout=TIMEOUT)
  61. redis_messages = list(map(
  62. bytes_if_py2,
  63. redis_connection.lrange('redis-echo', 0, -1)
  64. ))
  65. expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A']
  66. assert redis_messages == expected_messages
  67. @flaky
  68. def test_parent_ids(self, manager, num=10):
  69. assert manager.inspect().ping()
  70. c = chain(ids.si(i=i) for i in range(num))
  71. c.freeze()
  72. res = c()
  73. try:
  74. res.get(timeout=TIMEOUT)
  75. except TimeoutError:
  76. print(manager.inspect.active())
  77. print(manager.inspect.reserved())
  78. print(manager.inspect.stats())
  79. raise
  80. self.assert_ids(res, num - 1)
  81. def assert_ids(self, res, size):
  82. i, root = size, res
  83. while root.parent:
  84. root = root.parent
  85. node = res
  86. while node:
  87. root_id, parent_id, value = node.get(timeout=30)
  88. assert value == i
  89. if node.parent:
  90. assert parent_id == node.parent.id
  91. assert root_id == root.id
  92. node = node.parent
  93. i -= 1
  94. def test_chord_soft_timeout_recuperation(self, manager):
  95. """Test that if soft timeout happens in task but is managed by task,
  96. chord still get results normally
  97. """
  98. if not manager.app.conf.result_backend.startswith('redis'):
  99. raise pytest.skip('Requires redis result backend.')
  100. c = chord([
  101. # return 3
  102. add.s(1, 2),
  103. # return 0 after managing soft timeout
  104. delayed_sum_with_soft_guard.s(
  105. [100], pause_time=2
  106. ).set(
  107. soft_time_limit=1
  108. ),
  109. ])
  110. result = c(delayed_sum.s(pause_time=0)).get()
  111. assert result == 3
  112. class test_group:
  113. @flaky
  114. def test_empty_group_result(self, manager):
  115. if not manager.app.conf.result_backend.startswith('redis'):
  116. raise pytest.skip('Requires redis result backend.')
  117. task = group([])
  118. result = task.apply_async()
  119. GroupResult.save(result)
  120. task = GroupResult.restore(result.id)
  121. assert task.results == []
  122. @flaky
  123. def test_parent_ids(self, manager):
  124. assert manager.inspect().ping()
  125. g = (
  126. ids.si(i=1) |
  127. ids.si(i=2) |
  128. group(ids.si(i=i) for i in range(2, 50))
  129. )
  130. res = g()
  131. expected_root_id = res.parent.parent.id
  132. expected_parent_id = res.parent.id
  133. values = res.get(timeout=TIMEOUT)
  134. for i, r in enumerate(values):
  135. root_id, parent_id, value = r
  136. assert root_id == expected_root_id
  137. assert parent_id == expected_parent_id
  138. assert value == i + 2
  139. @flaky
  140. def test_nested_group(self, manager):
  141. assert manager.inspect().ping()
  142. c = group(
  143. add.si(1, 10),
  144. group(
  145. add.si(1, 100),
  146. group(
  147. add.si(1, 1000),
  148. add.si(1, 2000),
  149. ),
  150. ),
  151. )
  152. res = c()
  153. assert res.get(timeout=TIMEOUT) == [11, 101, 1001, 2001]
  154. def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
  155. root_id, parent_id, value = r.get(timeout=TIMEOUT)
  156. assert expected_value == value
  157. assert root_id == expected_root_id
  158. assert parent_id == expected_parent_id
  159. class test_chord:
  160. @flaky
  161. def test_redis_subscribed_channels_leak(self, manager):
  162. if not manager.app.conf.result_backend.startswith('redis'):
  163. raise pytest.skip('Requires redis result backend.')
  164. redis_client = StrictRedis()
  165. async_result = chord([add.s(5, 6), add.s(6, 7)])(delayed_sum.s())
  166. for _ in range(TIMEOUT):
  167. if async_result.state == 'STARTED':
  168. break
  169. sleep(0.2)
  170. channels_before = \
  171. len(redis_client.execute_command('PUBSUB CHANNELS'))
  172. assert async_result.get(timeout=TIMEOUT) == 24
  173. channels_after = \
  174. len(redis_client.execute_command('PUBSUB CHANNELS'))
  175. assert channels_after < channels_before
  176. @flaky
  177. def test_replaced_nested_chord(self, manager):
  178. try:
  179. manager.app.backend.ensure_chords_allowed()
  180. except NotImplementedError as e:
  181. raise pytest.skip(e.args[0])
  182. c1 = chord([
  183. chord(
  184. [add.s(1, 2), add_replaced.s(3, 4)],
  185. add_to_all.s(5),
  186. ) | tsum.s(),
  187. chord(
  188. [add_replaced.s(6, 7), add.s(0, 0)],
  189. add_to_all.s(8),
  190. ) | tsum.s(),
  191. ], add_to_all.s(9))
  192. res1 = c1()
  193. assert res1.get(timeout=TIMEOUT) == [29, 38]
  194. @flaky
  195. def test_add_to_chord(self, manager):
  196. if not manager.app.conf.result_backend.startswith('redis'):
  197. raise pytest.skip('Requires redis result backend.')
  198. c = group([add_to_all_to_chord.s([1, 2, 3], 4)]) | identity.s()
  199. res = c()
  200. assert res.get() == [0, 5, 6, 7]
  201. @flaky
  202. def test_add_chord_to_chord(self, manager):
  203. if not manager.app.conf.result_backend.startswith('redis'):
  204. raise pytest.skip('Requires redis result backend.')
  205. c = group([add_chord_to_chord.s([1, 2, 3], 4)]) | identity.s()
  206. res = c()
  207. assert res.get() == [0, 5 + 6 + 7]
  208. @flaky
  209. def test_group_chain(self, manager):
  210. if not manager.app.conf.result_backend.startswith('redis'):
  211. raise pytest.skip('Requires redis result backend.')
  212. c = (
  213. add.s(2, 2) |
  214. group(add.s(i) for i in range(4)) |
  215. add_to_all.s(8)
  216. )
  217. res = c()
  218. assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
  219. @flaky
  220. def test_nested_group_chain(self, manager):
  221. try:
  222. manager.app.backend.ensure_chords_allowed()
  223. except NotImplementedError as e:
  224. raise pytest.skip(e.args[0])
  225. if not manager.app.backend.supports_native_join:
  226. raise pytest.skip('Requires native join support.')
  227. c = chain(
  228. add.si(1, 0),
  229. group(
  230. add.si(1, 100),
  231. chain(
  232. add.si(1, 200),
  233. group(
  234. add.si(1, 1000),
  235. add.si(1, 2000),
  236. ),
  237. ),
  238. ),
  239. add.si(1, 10),
  240. )
  241. res = c()
  242. assert res.get(timeout=TIMEOUT) == 11
  243. @flaky
  244. def test_single_task_header(self, manager):
  245. try:
  246. manager.app.backend.ensure_chords_allowed()
  247. except NotImplementedError as e:
  248. raise pytest.skip(e.args[0])
  249. c1 = chord([add.s(2, 5)], body=add_to_all.s(9))
  250. res1 = c1()
  251. assert res1.get(timeout=TIMEOUT) == [16]
  252. c2 = group([add.s(2, 5)]) | add_to_all.s(9)
  253. res2 = c2()
  254. assert res2.get(timeout=TIMEOUT) == [16]
  255. def test_empty_header_chord(self, manager):
  256. try:
  257. manager.app.backend.ensure_chords_allowed()
  258. except NotImplementedError as e:
  259. raise pytest.skip(e.args[0])
  260. c1 = chord([], body=add_to_all.s(9))
  261. res1 = c1()
  262. assert res1.get(timeout=TIMEOUT) == []
  263. c2 = group([]) | add_to_all.s(9)
  264. res2 = c2()
  265. assert res2.get(timeout=TIMEOUT) == []
  266. @flaky
  267. def test_nested_chord(self, manager):
  268. try:
  269. manager.app.backend.ensure_chords_allowed()
  270. except NotImplementedError as e:
  271. raise pytest.skip(e.args[0])
  272. c1 = chord([
  273. chord([add.s(1, 2), add.s(3, 4)], add.s([5])),
  274. chord([add.s(6, 7)], add.s([10]))
  275. ], add_to_all.s(['A']))
  276. res1 = c1()
  277. assert res1.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
  278. c2 = group([
  279. group([add.s(1, 2), add.s(3, 4)]) | add.s([5]),
  280. group([add.s(6, 7)]) | add.s([10]),
  281. ]) | add_to_all.s(['A'])
  282. res2 = c2()
  283. assert res2.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
  284. c = group([
  285. group([
  286. group([
  287. group([
  288. add.s(1, 2)
  289. ]) | add.s([3])
  290. ]) | add.s([4])
  291. ]) | add.s([5])
  292. ]) | add.s([6])
  293. res = c()
  294. assert [[[[3, 3], 4], 5], 6] == res.get(timeout=TIMEOUT)
  295. @flaky
  296. def test_parent_ids(self, manager):
  297. if not manager.app.conf.result_backend.startswith('redis'):
  298. raise pytest.skip('Requires redis result backend.')
  299. root = ids.si(i=1)
  300. expected_root_id = root.freeze().id
  301. g = chain(
  302. root, ids.si(i=2),
  303. chord(
  304. group(ids.si(i=i) for i in range(3, 50)),
  305. chain(collect_ids.s(i=50) | ids.si(i=51)),
  306. ),
  307. )
  308. self.assert_parentids_chord(g(), expected_root_id)
  309. @flaky
  310. def test_parent_ids__OR(self, manager):
  311. if not manager.app.conf.result_backend.startswith('redis'):
  312. raise pytest.skip('Requires redis result backend.')
  313. root = ids.si(i=1)
  314. expected_root_id = root.freeze().id
  315. g = (
  316. root |
  317. ids.si(i=2) |
  318. group(ids.si(i=i) for i in range(3, 50)) |
  319. collect_ids.s(i=50) |
  320. ids.si(i=51)
  321. )
  322. self.assert_parentids_chord(g(), expected_root_id)
  323. def assert_parentids_chord(self, res, expected_root_id):
  324. assert isinstance(res, AsyncResult)
  325. assert isinstance(res.parent, AsyncResult)
  326. assert isinstance(res.parent.parent, GroupResult)
  327. assert isinstance(res.parent.parent.parent, AsyncResult)
  328. assert isinstance(res.parent.parent.parent.parent, AsyncResult)
  329. # first we check the last task
  330. assert_ids(res, 51, expected_root_id, res.parent.id)
  331. # then the chord callback
  332. prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
  333. assert value == 50
  334. assert root_id == expected_root_id
  335. # started by one of the chord header tasks.
  336. assert parent_id in res.parent.parent.results
  337. # check what the chord callback recorded
  338. for i, p in enumerate(prev):
  339. root_id, parent_id, value = p
  340. assert root_id == expected_root_id
  341. assert parent_id == res.parent.parent.parent.id
  342. # ids(i=2)
  343. root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
  344. assert value == 2
  345. assert parent_id == res.parent.parent.parent.parent.id
  346. assert root_id == expected_root_id
  347. # ids(i=1)
  348. root_id, parent_id, value = res.parent.parent.parent.parent.get(
  349. timeout=30)
  350. assert value == 1
  351. assert root_id == expected_root_id
  352. assert parent_id is None