test_canvas.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. from __future__ import absolute_import, unicode_literals
  2. from datetime import datetime, timedelta
  3. from time import sleep
  4. import pytest
  5. from celery import chain, chord, group
  6. from celery.exceptions import TimeoutError
  7. from celery.result import AsyncResult, GroupResult
  8. from redis import StrictRedis
  9. from .conftest import flaky
  10. from .tasks import (add, add_chord_to_chord, add_replaced, add_to_all,
  11. add_to_all_to_chord, collect_ids, delayed_sum,
  12. delayed_sum_with_soft_guard, identity, ids, print_unicode,
  13. redis_echo, second_order_replace1, tsum)
  14. TIMEOUT = 120
  15. class test_chain:
  16. @flaky
  17. def test_simple_chain(self, manager):
  18. c = add.s(4, 4) | add.s(8) | add.s(16)
  19. assert c().get(timeout=TIMEOUT) == 32
  20. @flaky
  21. def test_complex_chain(self, manager):
  22. c = (
  23. add.s(2, 2) | (
  24. add.s(4) | add_replaced.s(8) | add.s(16) | add.s(32)
  25. ) |
  26. group(add.s(i) for i in range(4))
  27. )
  28. res = c()
  29. assert res.get(timeout=TIMEOUT) == [64, 65, 66, 67]
  30. @flaky
  31. def test_group_results_in_chain(self, manager):
  32. # This adds in an explicit test for the special case added in commit
  33. # 1e3fcaa969de6ad32b52a3ed8e74281e5e5360e6
  34. c = (
  35. group(
  36. add.s(1, 2) | group(
  37. add.s(1), add.s(2)
  38. )
  39. )
  40. )
  41. res = c()
  42. assert res.get(timeout=TIMEOUT) == [4, 5]
  43. @flaky
  44. def test_chain_inside_group_receives_arguments(self, manager):
  45. c = (
  46. add.s(5, 6) |
  47. group((add.s(1) | add.s(2), add.s(3)))
  48. )
  49. res = c()
  50. assert res.get(timeout=TIMEOUT) == [14, 14]
  51. @flaky
  52. def test_group_chord_group_chain(self, manager):
  53. from celery.five import bytes_if_py2
  54. if not manager.app.conf.result_backend.startswith('redis'):
  55. raise pytest.skip('Requires redis result backend.')
  56. redis_connection = StrictRedis()
  57. redis_connection.delete('redis-echo')
  58. before = group(redis_echo.si('before {}'.format(i)) for i in range(3))
  59. connect = redis_echo.si('connect')
  60. after = group(redis_echo.si('after {}'.format(i)) for i in range(2))
  61. result = (before | connect | after).delay()
  62. result.get(timeout=TIMEOUT)
  63. redis_messages = list(map(
  64. bytes_if_py2,
  65. redis_connection.lrange('redis-echo', 0, -1)
  66. ))
  67. before_items = \
  68. set(map(bytes_if_py2, (b'before 0', b'before 1', b'before 2')))
  69. after_items = set(map(bytes_if_py2, (b'after 0', b'after 1')))
  70. assert set(redis_messages[:3]) == before_items
  71. assert redis_messages[3] == b'connect'
  72. assert set(redis_messages[4:]) == after_items
  73. redis_connection.delete('redis-echo')
  74. @flaky
  75. def test_second_order_replace(self, manager):
  76. from celery.five import bytes_if_py2
  77. if not manager.app.conf.result_backend.startswith('redis'):
  78. raise pytest.skip('Requires redis result backend.')
  79. redis_connection = StrictRedis()
  80. redis_connection.delete('redis-echo')
  81. result = second_order_replace1.delay()
  82. result.get(timeout=TIMEOUT)
  83. redis_messages = list(map(
  84. bytes_if_py2,
  85. redis_connection.lrange('redis-echo', 0, -1)
  86. ))
  87. expected_messages = [b'In A', b'In B', b'In/Out C', b'Out B', b'Out A']
  88. assert redis_messages == expected_messages
  89. @flaky
  90. def test_parent_ids(self, manager, num=10):
  91. assert manager.inspect().ping()
  92. c = chain(ids.si(i=i) for i in range(num))
  93. c.freeze()
  94. res = c()
  95. try:
  96. res.get(timeout=TIMEOUT)
  97. except TimeoutError:
  98. print(manager.inspect.active())
  99. print(manager.inspect.reserved())
  100. print(manager.inspect.stats())
  101. raise
  102. self.assert_ids(res, num - 1)
  103. def assert_ids(self, res, size):
  104. i, root = size, res
  105. while root.parent:
  106. root = root.parent
  107. node = res
  108. while node:
  109. root_id, parent_id, value = node.get(timeout=30)
  110. assert value == i
  111. if node.parent:
  112. assert parent_id == node.parent.id
  113. assert root_id == root.id
  114. node = node.parent
  115. i -= 1
  116. def test_chord_soft_timeout_recuperation(self, manager):
  117. """Test that if soft timeout happens in task but is managed by task,
  118. chord still get results normally
  119. """
  120. if not manager.app.conf.result_backend.startswith('redis'):
  121. raise pytest.skip('Requires redis result backend.')
  122. c = chord([
  123. # return 3
  124. add.s(1, 2),
  125. # return 0 after managing soft timeout
  126. delayed_sum_with_soft_guard.s(
  127. [100], pause_time=2
  128. ).set(
  129. soft_time_limit=1
  130. ),
  131. ])
  132. result = c(delayed_sum.s(pause_time=0)).get()
  133. assert result == 3
  134. @pytest.mark.xfail()
  135. def test_chord_error_handler_with_eta(self, manager):
  136. try:
  137. manager.app.backend.ensure_chords_allowed()
  138. except NotImplementedError as e:
  139. raise pytest.skip(e.args[0])
  140. eta = datetime.utcnow() + timedelta(seconds=10)
  141. c = chain(
  142. group(
  143. add.s(1, 2),
  144. add.s(3, 4),
  145. ),
  146. tsum.s()
  147. ).on_error(print_unicode.s()).apply_async(eta=eta)
  148. result = c.get()
  149. assert result == 10
  150. class test_group:
  151. @flaky
  152. def test_empty_group_result(self, manager):
  153. if not manager.app.conf.result_backend.startswith('redis'):
  154. raise pytest.skip('Requires redis result backend.')
  155. task = group([])
  156. result = task.apply_async()
  157. GroupResult.save(result)
  158. task = GroupResult.restore(result.id)
  159. assert task.results == []
  160. @flaky
  161. def test_parent_ids(self, manager):
  162. assert manager.inspect().ping()
  163. g = (
  164. ids.si(i=1) |
  165. ids.si(i=2) |
  166. group(ids.si(i=i) for i in range(2, 50))
  167. )
  168. res = g()
  169. expected_root_id = res.parent.parent.id
  170. expected_parent_id = res.parent.id
  171. values = res.get(timeout=TIMEOUT)
  172. for i, r in enumerate(values):
  173. root_id, parent_id, value = r
  174. assert root_id == expected_root_id
  175. assert parent_id == expected_parent_id
  176. assert value == i + 2
  177. @flaky
  178. def test_nested_group(self, manager):
  179. assert manager.inspect().ping()
  180. c = group(
  181. add.si(1, 10),
  182. group(
  183. add.si(1, 100),
  184. group(
  185. add.si(1, 1000),
  186. add.si(1, 2000),
  187. ),
  188. ),
  189. )
  190. res = c()
  191. assert res.get(timeout=TIMEOUT) == [11, 101, 1001, 2001]
  192. def assert_ids(r, expected_value, expected_root_id, expected_parent_id):
  193. root_id, parent_id, value = r.get(timeout=TIMEOUT)
  194. assert expected_value == value
  195. assert root_id == expected_root_id
  196. assert parent_id == expected_parent_id
  197. class test_chord:
  198. @flaky
  199. def test_redis_subscribed_channels_leak(self, manager):
  200. if not manager.app.conf.result_backend.startswith('redis'):
  201. raise pytest.skip('Requires redis result backend.')
  202. redis_client = StrictRedis()
  203. async_result = chord([add.s(5, 6), add.s(6, 7)])(delayed_sum.s())
  204. for _ in range(TIMEOUT):
  205. if async_result.state == 'STARTED':
  206. break
  207. sleep(0.2)
  208. channels_before = \
  209. len(redis_client.execute_command('PUBSUB CHANNELS'))
  210. assert async_result.get(timeout=TIMEOUT) == 24
  211. channels_after = \
  212. len(redis_client.execute_command('PUBSUB CHANNELS'))
  213. assert channels_after < channels_before
  214. @flaky
  215. def test_replaced_nested_chord(self, manager):
  216. try:
  217. manager.app.backend.ensure_chords_allowed()
  218. except NotImplementedError as e:
  219. raise pytest.skip(e.args[0])
  220. c1 = chord([
  221. chord(
  222. [add.s(1, 2), add_replaced.s(3, 4)],
  223. add_to_all.s(5),
  224. ) | tsum.s(),
  225. chord(
  226. [add_replaced.s(6, 7), add.s(0, 0)],
  227. add_to_all.s(8),
  228. ) | tsum.s(),
  229. ], add_to_all.s(9))
  230. res1 = c1()
  231. assert res1.get(timeout=TIMEOUT) == [29, 38]
  232. @flaky
  233. def test_add_to_chord(self, manager):
  234. if not manager.app.conf.result_backend.startswith('redis'):
  235. raise pytest.skip('Requires redis result backend.')
  236. c = group([add_to_all_to_chord.s([1, 2, 3], 4)]) | identity.s()
  237. res = c()
  238. assert res.get() == [0, 5, 6, 7]
  239. @flaky
  240. def test_add_chord_to_chord(self, manager):
  241. if not manager.app.conf.result_backend.startswith('redis'):
  242. raise pytest.skip('Requires redis result backend.')
  243. c = group([add_chord_to_chord.s([1, 2, 3], 4)]) | identity.s()
  244. res = c()
  245. assert res.get() == [0, 5 + 6 + 7]
  246. @flaky
  247. def test_group_chain(self, manager):
  248. if not manager.app.conf.result_backend.startswith('redis'):
  249. raise pytest.skip('Requires redis result backend.')
  250. c = (
  251. add.s(2, 2) |
  252. group(add.s(i) for i in range(4)) |
  253. add_to_all.s(8)
  254. )
  255. res = c()
  256. assert res.get(timeout=TIMEOUT) == [12, 13, 14, 15]
  257. @flaky
  258. def test_nested_group_chain(self, manager):
  259. try:
  260. manager.app.backend.ensure_chords_allowed()
  261. except NotImplementedError as e:
  262. raise pytest.skip(e.args[0])
  263. if not manager.app.backend.supports_native_join:
  264. raise pytest.skip('Requires native join support.')
  265. c = chain(
  266. add.si(1, 0),
  267. group(
  268. add.si(1, 100),
  269. chain(
  270. add.si(1, 200),
  271. group(
  272. add.si(1, 1000),
  273. add.si(1, 2000),
  274. ),
  275. ),
  276. ),
  277. add.si(1, 10),
  278. )
  279. res = c()
  280. assert res.get(timeout=TIMEOUT) == 11
  281. @flaky
  282. def test_single_task_header(self, manager):
  283. try:
  284. manager.app.backend.ensure_chords_allowed()
  285. except NotImplementedError as e:
  286. raise pytest.skip(e.args[0])
  287. c1 = chord([add.s(2, 5)], body=add_to_all.s(9))
  288. res1 = c1()
  289. assert res1.get(timeout=TIMEOUT) == [16]
  290. c2 = group([add.s(2, 5)]) | add_to_all.s(9)
  291. res2 = c2()
  292. assert res2.get(timeout=TIMEOUT) == [16]
  293. def test_empty_header_chord(self, manager):
  294. try:
  295. manager.app.backend.ensure_chords_allowed()
  296. except NotImplementedError as e:
  297. raise pytest.skip(e.args[0])
  298. c1 = chord([], body=add_to_all.s(9))
  299. res1 = c1()
  300. assert res1.get(timeout=TIMEOUT) == []
  301. c2 = group([]) | add_to_all.s(9)
  302. res2 = c2()
  303. assert res2.get(timeout=TIMEOUT) == []
  304. @flaky
  305. def test_nested_chord(self, manager):
  306. try:
  307. manager.app.backend.ensure_chords_allowed()
  308. except NotImplementedError as e:
  309. raise pytest.skip(e.args[0])
  310. c1 = chord([
  311. chord([add.s(1, 2), add.s(3, 4)], add.s([5])),
  312. chord([add.s(6, 7)], add.s([10]))
  313. ], add_to_all.s(['A']))
  314. res1 = c1()
  315. assert res1.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
  316. c2 = group([
  317. group([add.s(1, 2), add.s(3, 4)]) | add.s([5]),
  318. group([add.s(6, 7)]) | add.s([10]),
  319. ]) | add_to_all.s(['A'])
  320. res2 = c2()
  321. assert res2.get(timeout=TIMEOUT) == [[3, 7, 5, 'A'], [13, 10, 'A']]
  322. c = group([
  323. group([
  324. group([
  325. group([
  326. add.s(1, 2)
  327. ]) | add.s([3])
  328. ]) | add.s([4])
  329. ]) | add.s([5])
  330. ]) | add.s([6])
  331. res = c()
  332. assert [[[[3, 3], 4], 5], 6] == res.get(timeout=TIMEOUT)
  333. @flaky
  334. def test_parent_ids(self, manager):
  335. if not manager.app.conf.result_backend.startswith('redis'):
  336. raise pytest.skip('Requires redis result backend.')
  337. root = ids.si(i=1)
  338. expected_root_id = root.freeze().id
  339. g = chain(
  340. root, ids.si(i=2),
  341. chord(
  342. group(ids.si(i=i) for i in range(3, 50)),
  343. chain(collect_ids.s(i=50) | ids.si(i=51)),
  344. ),
  345. )
  346. self.assert_parentids_chord(g(), expected_root_id)
  347. @flaky
  348. def test_parent_ids__OR(self, manager):
  349. if not manager.app.conf.result_backend.startswith('redis'):
  350. raise pytest.skip('Requires redis result backend.')
  351. root = ids.si(i=1)
  352. expected_root_id = root.freeze().id
  353. g = (
  354. root |
  355. ids.si(i=2) |
  356. group(ids.si(i=i) for i in range(3, 50)) |
  357. collect_ids.s(i=50) |
  358. ids.si(i=51)
  359. )
  360. self.assert_parentids_chord(g(), expected_root_id)
  361. def assert_parentids_chord(self, res, expected_root_id):
  362. assert isinstance(res, AsyncResult)
  363. assert isinstance(res.parent, AsyncResult)
  364. assert isinstance(res.parent.parent, GroupResult)
  365. assert isinstance(res.parent.parent.parent, AsyncResult)
  366. assert isinstance(res.parent.parent.parent.parent, AsyncResult)
  367. # first we check the last task
  368. assert_ids(res, 51, expected_root_id, res.parent.id)
  369. # then the chord callback
  370. prev, (root_id, parent_id, value) = res.parent.get(timeout=30)
  371. assert value == 50
  372. assert root_id == expected_root_id
  373. # started by one of the chord header tasks.
  374. assert parent_id in res.parent.parent.results
  375. # check what the chord callback recorded
  376. for i, p in enumerate(prev):
  377. root_id, parent_id, value = p
  378. assert root_id == expected_root_id
  379. assert parent_id == res.parent.parent.parent.id
  380. # ids(i=2)
  381. root_id, parent_id, value = res.parent.parent.parent.get(timeout=30)
  382. assert value == 2
  383. assert parent_id == res.parent.parent.parent.parent.id
  384. assert root_id == expected_root_id
  385. # ids(i=1)
  386. root_id, parent_id, value = res.parent.parent.parent.parent.get(
  387. timeout=30)
  388. assert value == 1
  389. assert root_id == expected_root_id
  390. assert parent_id is None