test_canvas.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754
  1. import pytest
  2. from case import ContextMock, MagicMock, Mock
  3. from celery._state import _task_stack
  4. from celery.canvas import (
  5. Signature,
  6. chain,
  7. group,
  8. chord,
  9. signature,
  10. xmap,
  11. xstarmap,
  12. chunks,
  13. _maybe_group,
  14. maybe_signature,
  15. maybe_unroll_group,
  16. _seq_concat_seq,
  17. )
  18. from celery.result import AsyncResult, GroupResult, EagerResult
  19. SIG = Signature({
  20. 'task': 'TASK',
  21. 'args': ('A1',),
  22. 'kwargs': {'K1': 'V1'},
  23. 'options': {'task_id': 'TASK_ID'},
  24. 'subtask_type': ''},
  25. )
  26. class test_maybe_unroll_group:
  27. def test_when_no_len_and_no_length_hint(self):
  28. g = MagicMock(name='group')
  29. g.tasks.__len__.side_effect = TypeError()
  30. g.tasks.__length_hint__ = Mock()
  31. g.tasks.__length_hint__.return_value = 0
  32. assert maybe_unroll_group(g) is g
  33. g.tasks.__length_hint__.side_effect = AttributeError()
  34. assert maybe_unroll_group(g) is g
  35. @pytest.mark.parametrize('a,b,expected', [
  36. ((1, 2, 3), [4, 5], (1, 2, 3, 4, 5)),
  37. ((1, 2), [3, 4, 5], [1, 2, 3, 4, 5]),
  38. ([1, 2, 3], (4, 5), [1, 2, 3, 4, 5]),
  39. ([1, 2], (3, 4, 5), (1, 2, 3, 4, 5)),
  40. ])
  41. def test_seq_concat_seq(a, b, expected):
  42. res = _seq_concat_seq(a, b)
  43. assert type(res) is type(expected) # noqa
  44. assert res == expected
  45. class CanvasCase:
  46. def setup(self):
  47. @self.app.task(shared=False)
  48. def add(x, y):
  49. return x + y
  50. self.add = add
  51. @self.app.task(shared=False)
  52. def mul(x, y):
  53. return x * y
  54. self.mul = mul
  55. @self.app.task(shared=False)
  56. def div(x, y):
  57. return x / y
  58. self.div = div
  59. class test_Signature(CanvasCase):
  60. def test_getitem_property_class(self):
  61. assert Signature.task
  62. assert Signature.args
  63. assert Signature.kwargs
  64. assert Signature.options
  65. assert Signature.subtask_type
  66. def test_getitem_property(self):
  67. assert SIG.task == 'TASK'
  68. assert SIG.args == ('A1',)
  69. assert SIG.kwargs == {'K1': 'V1'}
  70. assert SIG.options == {'task_id': 'TASK_ID'}
  71. assert SIG.subtask_type == ''
  72. def test_call(self):
  73. x = Signature('foo', (1, 2), {'arg1': 33}, app=self.app)
  74. x.type = Mock(name='type')
  75. x(3, 4, arg2=66)
  76. x.type.assert_called_with(3, 4, 1, 2, arg1=33, arg2=66)
  77. def test_link_on_scalar(self):
  78. x = Signature('TASK', link=Signature('B'))
  79. assert x.options['link']
  80. x.link(Signature('C'))
  81. assert isinstance(x.options['link'], list)
  82. assert Signature('B') in x.options['link']
  83. assert Signature('C') in x.options['link']
  84. def test_json(self):
  85. x = Signature('TASK', link=Signature('B', app=self.app), app=self.app)
  86. assert x.__json__() == dict(x)
  87. @pytest.mark.usefixtures('depends_on_current_app')
  88. def test_reduce(self):
  89. x = Signature('TASK', (2, 4), app=self.app)
  90. fun, args = x.__reduce__()
  91. assert fun(*args) == x
  92. def test_replace(self):
  93. x = Signature('TASK', ('A'), {})
  94. assert x.replace(args=('B',)).args == ('B',)
  95. assert x.replace(kwargs={'FOO': 'BAR'}).kwargs == {
  96. 'FOO': 'BAR',
  97. }
  98. assert x.replace(options={'task_id': '123'}).options == {
  99. 'task_id': '123',
  100. }
  101. def test_set(self):
  102. assert Signature('TASK', x=1).set(task_id='2').options == {
  103. 'x': 1, 'task_id': '2',
  104. }
  105. def test_link(self):
  106. x = signature(SIG)
  107. x.link(SIG)
  108. x.link(SIG)
  109. assert SIG in x.options['link']
  110. assert len(x.options['link']) == 1
  111. def test_link_error(self):
  112. x = signature(SIG)
  113. x.link_error(SIG)
  114. x.link_error(SIG)
  115. assert SIG in x.options['link_error']
  116. assert len(x.options['link_error']) == 1
  117. def test_flatten_links(self):
  118. tasks = [self.add.s(2, 2), self.mul.s(4), self.div.s(2)]
  119. tasks[0].link(tasks[1])
  120. tasks[1].link(tasks[2])
  121. assert tasks[0].flatten_links() == tasks
  122. def test_OR(self):
  123. x = self.add.s(2, 2) | self.mul.s(4)
  124. assert isinstance(x, chain)
  125. y = self.add.s(4, 4) | self.div.s(2)
  126. z = x | y
  127. assert isinstance(y, chain)
  128. assert isinstance(z, chain)
  129. assert len(z.tasks) == 4
  130. with pytest.raises(TypeError):
  131. x | 10
  132. ax = self.add.s(2, 2) | (self.add.s(4) | self.add.s(8))
  133. assert isinstance(ax, chain)
  134. assert len(ax.tasks), 3 == 'consolidates chain to chain'
  135. def test_INVERT(self):
  136. x = self.add.s(2, 2)
  137. x.apply_async = Mock()
  138. x.apply_async.return_value = Mock()
  139. x.apply_async.return_value.get = Mock()
  140. x.apply_async.return_value.get.return_value = 4
  141. assert ~x == 4
  142. x.apply_async.assert_called()
  143. def test_merge_immutable(self):
  144. x = self.add.si(2, 2, foo=1)
  145. args, kwargs, options = x._merge((4,), {'bar': 2}, {'task_id': 3})
  146. assert args == (2, 2)
  147. assert kwargs == {'foo': 1}
  148. assert options == {'task_id': 3}
  149. def test_set_immutable(self):
  150. x = self.add.s(2, 2)
  151. assert not x.immutable
  152. x.set(immutable=True)
  153. assert x.immutable
  154. x.set(immutable=False)
  155. assert not x.immutable
  156. def test_election(self):
  157. x = self.add.s(2, 2)
  158. x.freeze('foo')
  159. x.type.app.control = Mock()
  160. r = x.election()
  161. x.type.app.control.election.assert_called()
  162. assert r.id == 'foo'
  163. def test_AsyncResult_when_not_registered(self):
  164. s = signature('xxx.not.registered', app=self.app)
  165. assert s.AsyncResult
  166. def test_apply_async_when_not_registered(self):
  167. s = signature('xxx.not.registered', app=self.app)
  168. assert s._apply_async
  169. class test_xmap_xstarmap(CanvasCase):
  170. def test_apply(self):
  171. for type, attr in [(xmap, 'map'), (xstarmap, 'starmap')]:
  172. args = [(i, i) for i in range(10)]
  173. s = getattr(self.add, attr)(args)
  174. s.type = Mock()
  175. s.apply_async(foo=1)
  176. s.type.apply_async.assert_called_with(
  177. (), {'task': self.add.s(), 'it': args}, foo=1,
  178. route_name=self.add.name,
  179. )
  180. assert type.from_dict(dict(s)) == s
  181. assert repr(s)
  182. class test_chunks(CanvasCase):
  183. def test_chunks(self):
  184. x = self.add.chunks(range(100), 10)
  185. assert dict(chunks.from_dict(dict(x), app=self.app)) == dict(x)
  186. assert x.group()
  187. assert len(x.group().tasks) == 10
  188. x.group = Mock()
  189. gr = x.group.return_value = Mock()
  190. x.apply_async()
  191. gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
  192. gr.apply_async.reset_mock()
  193. x()
  194. gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
  195. self.app.conf.task_always_eager = True
  196. chunks.apply_chunks(app=self.app, **x['kwargs'])
  197. class test_chain(CanvasCase):
  198. def test_clone_preserves_state(self):
  199. x = chain(self.add.s(i, i) for i in range(10))
  200. assert x.clone().tasks == x.tasks
  201. assert x.clone().kwargs == x.kwargs
  202. assert x.clone().args == x.args
  203. def test_repr(self):
  204. x = self.add.s(2, 2) | self.add.s(2)
  205. assert repr(x) == '%s(2, 2) | %s(2)' % (self.add.name, self.add.name)
  206. def test_apply_async(self):
  207. c = self.add.s(2, 2) | self.add.s(4) | self.add.s(8)
  208. result = c.apply_async()
  209. assert result.parent
  210. assert result.parent.parent
  211. assert result.parent.parent.parent is None
  212. def test_group_to_chord__freeze_parent_id(self):
  213. def using_freeze(c):
  214. c.freeze(parent_id='foo', root_id='root')
  215. return c._frozen[0]
  216. self.assert_group_to_chord_parent_ids(using_freeze)
  217. def assert_group_to_chord_parent_ids(self, freezefun):
  218. c = (
  219. self.add.s(5, 5) |
  220. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  221. self.add.si(10, 10) |
  222. self.add.si(20, 20) |
  223. self.add.si(30, 30)
  224. )
  225. tasks = freezefun(c)
  226. assert tasks[-1].parent_id == 'foo'
  227. assert tasks[-1].root_id == 'root'
  228. assert tasks[-2].parent_id == tasks[-1].id
  229. assert tasks[-2].root_id == 'root'
  230. assert tasks[-2].body.parent_id == tasks[-2].tasks.id
  231. assert tasks[-2].body.parent_id == tasks[-2].id
  232. assert tasks[-2].body.root_id == 'root'
  233. assert tasks[-2].tasks.tasks[0].parent_id == tasks[-1].id
  234. assert tasks[-2].tasks.tasks[0].root_id == 'root'
  235. assert tasks[-2].tasks.tasks[1].parent_id == tasks[-1].id
  236. assert tasks[-2].tasks.tasks[1].root_id == 'root'
  237. assert tasks[-2].tasks.tasks[2].parent_id == tasks[-1].id
  238. assert tasks[-2].tasks.tasks[2].root_id == 'root'
  239. assert tasks[-2].tasks.tasks[3].parent_id == tasks[-1].id
  240. assert tasks[-2].tasks.tasks[3].root_id == 'root'
  241. assert tasks[-2].tasks.tasks[4].parent_id == tasks[-1].id
  242. assert tasks[-2].tasks.tasks[4].root_id == 'root'
  243. assert tasks[-3].parent_id == tasks[-2].body.id
  244. assert tasks[-3].root_id == 'root'
  245. assert tasks[-4].parent_id == tasks[-3].id
  246. assert tasks[-4].root_id == 'root'
  247. def test_splices_chains(self):
  248. c = chain(
  249. self.add.s(5, 5),
  250. chain(self.add.s(6), self.add.s(7), self.add.s(8), app=self.app),
  251. app=self.app,
  252. )
  253. c.freeze()
  254. tasks, _ = c._frozen
  255. assert len(tasks) == 4
  256. def test_from_dict_no_tasks(self):
  257. assert chain.from_dict(dict(chain(app=self.app)), app=self.app)
  258. @pytest.mark.usefixtures('depends_on_current_app')
  259. def test_app_falls_back_to_default(self):
  260. from celery._state import current_app
  261. assert chain().app is current_app
  262. def test_handles_dicts(self):
  263. c = chain(
  264. self.add.s(5, 5), dict(self.add.s(8)), app=self.app,
  265. )
  266. c.freeze()
  267. tasks, _ = c._frozen
  268. for task in tasks:
  269. assert isinstance(task, Signature)
  270. assert task.app is self.app
  271. def test_group_to_chord(self):
  272. c = (
  273. self.add.s(5) |
  274. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  275. self.add.s(10) |
  276. self.add.s(20) |
  277. self.add.s(30)
  278. )
  279. c._use_link = True
  280. tasks, results = c.prepare_steps((), c.tasks)
  281. assert tasks[-1].args[0] == 5
  282. assert isinstance(tasks[-2], chord)
  283. assert len(tasks[-2].tasks) == 5
  284. assert tasks[-2].parent_id == tasks[-1].id
  285. assert tasks[-2].root_id == tasks[-1].id
  286. assert tasks[-2].body.args[0] == 10
  287. assert tasks[-2].body.parent_id == tasks[-2].id
  288. assert tasks[-3].args[0] == 20
  289. assert tasks[-3].root_id == tasks[-1].id
  290. assert tasks[-3].parent_id == tasks[-2].body.id
  291. assert tasks[-4].args[0] == 30
  292. assert tasks[-4].parent_id == tasks[-3].id
  293. assert tasks[-4].root_id == tasks[-1].id
  294. assert tasks[-2].body.options['link']
  295. assert tasks[-2].body.options['link'][0].options['link']
  296. c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
  297. c2._use_link = True
  298. tasks2, _ = c2.prepare_steps((), c2.tasks)
  299. assert isinstance(tasks2[0], group)
  300. def test_group_to_chord__protocol_2__or(self):
  301. c = (
  302. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  303. self.add.s(10) |
  304. self.add.s(20) |
  305. self.add.s(30)
  306. )
  307. assert isinstance(c, chord)
  308. def test_group_to_chord__protocol_2(self):
  309. c = chain(
  310. group([self.add.s(i, i) for i in range(5)], app=self.app),
  311. self.add.s(10),
  312. self.add.s(20),
  313. self.add.s(30)
  314. )
  315. c._use_link = False
  316. tasks, _ = c.prepare_steps((), c.tasks)
  317. assert isinstance(tasks[-1], chord)
  318. c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
  319. c2._use_link = False
  320. tasks2, _ = c2.prepare_steps((), c2.tasks)
  321. assert isinstance(tasks2[0], group)
  322. def test_apply_options(self):
  323. class static(Signature):
  324. def clone(self, *args, **kwargs):
  325. return self
  326. def s(*args, **kwargs):
  327. return static(self.add, args, kwargs, type=self.add, app=self.app)
  328. c = s(2, 2) | s(4) | s(8)
  329. r1 = c.apply_async(task_id='some_id')
  330. assert r1.id == 'some_id'
  331. c.apply_async(group_id='some_group_id')
  332. assert c.tasks[-1].options['group_id'] == 'some_group_id'
  333. c.apply_async(chord='some_chord_id')
  334. assert c.tasks[-1].options['chord'] == 'some_chord_id'
  335. c.apply_async(link=[s(32)])
  336. assert c.tasks[-1].options['link'] == [s(32)]
  337. c.apply_async(link_error=[s('error')])
  338. for task in c.tasks:
  339. assert task.options['link_error'] == [s('error')]
  340. def test_reverse(self):
  341. x = self.add.s(2, 2) | self.add.s(2)
  342. assert isinstance(signature(x), chain)
  343. assert isinstance(signature(dict(x)), chain)
  344. def test_always_eager(self):
  345. self.app.conf.task_always_eager = True
  346. assert ~(self.add.s(4, 4) | self.add.s(8)) == 16
  347. def test_apply(self):
  348. x = chain(self.add.s(4, 4), self.add.s(8), self.add.s(10))
  349. res = x.apply()
  350. assert isinstance(res, EagerResult)
  351. assert res.get() == 26
  352. assert res.parent.get() == 16
  353. assert res.parent.parent.get() == 8
  354. assert res.parent.parent.parent is None
  355. def test_empty_chain_returns_none(self):
  356. assert chain(app=self.app)() is None
  357. assert chain(app=self.app).apply_async() is None
  358. def test_root_id_parent_id(self):
  359. self.app.conf.task_protocol = 2
  360. c = chain(self.add.si(i, i) for i in range(4))
  361. c.freeze()
  362. tasks, _ = c._frozen
  363. for i, task in enumerate(tasks):
  364. assert task.root_id == tasks[-1].id
  365. try:
  366. assert task.parent_id == tasks[i + 1].id
  367. except IndexError:
  368. assert i == len(tasks) - 1
  369. else:
  370. valid_parents = i
  371. assert valid_parents == len(tasks) - 2
  372. self.assert_sent_with_ids(tasks[-1], tasks[-1].id, 'foo',
  373. parent_id='foo')
  374. assert tasks[-2].options['parent_id']
  375. self.assert_sent_with_ids(tasks[-2], tasks[-1].id, tasks[-1].id)
  376. self.assert_sent_with_ids(tasks[-3], tasks[-1].id, tasks[-2].id)
  377. self.assert_sent_with_ids(tasks[-4], tasks[-1].id, tasks[-3].id)
  378. def assert_sent_with_ids(self, task, rid, pid, **options):
  379. self.app.amqp.send_task_message = Mock(name='send_task_message')
  380. self.app.backend = Mock()
  381. self.app.producer_or_acquire = ContextMock()
  382. task.apply_async(**options)
  383. self.app.amqp.send_task_message.assert_called()
  384. message = self.app.amqp.send_task_message.call_args[0][2]
  385. assert message.headers['parent_id'] == pid
  386. assert message.headers['root_id'] == rid
  387. def test_call_no_tasks(self):
  388. x = chain()
  389. assert not x()
  390. def test_call_with_tasks(self):
  391. x = self.add.s(2, 2) | self.add.s(4)
  392. x.apply_async = Mock()
  393. x(2, 2, foo=1)
  394. x.apply_async.assert_called_with((2, 2), {'foo': 1})
  395. def test_from_dict_no_args__with_args(self):
  396. x = dict(self.add.s(2, 2) | self.add.s(4))
  397. x['args'] = None
  398. assert isinstance(chain.from_dict(x), chain)
  399. x['args'] = (2,)
  400. assert isinstance(chain.from_dict(x), chain)
  401. def test_accepts_generator_argument(self):
  402. x = chain(self.add.s(i) for i in range(10))
  403. assert x.tasks[0].type, self.add
  404. assert x.type
  405. def test_chord_sets_result_parent(self):
  406. g = (self.add.s(0, 0) |
  407. group(self.add.s(i, i) for i in range(1, 10)) |
  408. self.add.s(2, 2) |
  409. self.add.s(4, 4))
  410. res = g.freeze()
  411. assert isinstance(res, AsyncResult)
  412. assert not isinstance(res, GroupResult)
  413. assert isinstance(res.parent, AsyncResult)
  414. assert not isinstance(res.parent, GroupResult)
  415. assert isinstance(res.parent.parent, GroupResult)
  416. assert isinstance(res.parent.parent.parent, AsyncResult)
  417. assert not isinstance(res.parent.parent.parent, GroupResult)
  418. assert res.parent.parent.parent.parent is None
  419. seen = set()
  420. node = res
  421. while node:
  422. assert node.id not in seen
  423. seen.add(node.id)
  424. node = node.parent
  425. class test_group(CanvasCase):
  426. def test_repr(self):
  427. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  428. assert repr(x)
  429. def test_reverse(self):
  430. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  431. assert isinstance(signature(x), group)
  432. assert isinstance(signature(dict(x)), group)
  433. def test_cannot_link_on_group(self):
  434. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  435. with pytest.raises(TypeError):
  436. x.apply_async(link=self.add.s(2, 2))
  437. def test_cannot_link_error_on_group(self):
  438. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  439. with pytest.raises(TypeError):
  440. x.apply_async(link_error=self.add.s(2, 2))
  441. def test_group_with_group_argument(self):
  442. g1 = group(self.add.s(2, 2), self.add.s(4, 4), app=self.app)
  443. g2 = group(g1, app=self.app)
  444. assert g2.tasks is g1.tasks
  445. def test_maybe_group_sig(self):
  446. assert _maybe_group(self.add.s(2, 2), self.app) == [self.add.s(2, 2)]
  447. def test_apply(self):
  448. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  449. res = x.apply()
  450. assert res.get(), [8 == 16]
  451. def test_apply_async(self):
  452. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  453. x.apply_async()
  454. def test_prepare_with_dict(self):
  455. x = group([self.add.s(4, 4), dict(self.add.s(8, 8))], app=self.app)
  456. x.apply_async()
  457. def test_group_in_group(self):
  458. g1 = group(self.add.s(2, 2), self.add.s(4, 4), app=self.app)
  459. g2 = group(self.add.s(8, 8), g1, self.add.s(16, 16), app=self.app)
  460. g2.apply_async()
  461. def test_set_immutable(self):
  462. g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
  463. g1.set_immutable(True)
  464. for task in g1.tasks:
  465. task.set_immutable.assert_called_with(True)
  466. def test_link(self):
  467. g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
  468. sig = Mock(name='sig')
  469. g1.link(sig)
  470. g1.tasks[0].link.assert_called_with(sig.clone().set(immutable=True))
  471. def test_link_error(self):
  472. g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
  473. sig = Mock(name='sig')
  474. g1.link_error(sig)
  475. g1.tasks[0].link_error.assert_called_with(
  476. sig.clone().set(immutable=True),
  477. )
  478. def test_apply_empty(self):
  479. x = group(app=self.app)
  480. x.apply()
  481. res = x.apply_async()
  482. assert res
  483. assert not res.results
  484. def test_apply_async_with_parent(self):
  485. _task_stack.push(self.add)
  486. try:
  487. self.add.push_request(called_directly=False)
  488. try:
  489. assert not self.add.request.children
  490. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  491. res = x()
  492. assert self.add.request.children
  493. assert res in self.add.request.children
  494. assert len(self.add.request.children) == 1
  495. finally:
  496. self.add.pop_request()
  497. finally:
  498. _task_stack.pop()
  499. def test_from_dict(self):
  500. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  501. x['args'] = (2, 2)
  502. assert group.from_dict(dict(x))
  503. x['args'] = None
  504. assert group.from_dict(dict(x))
  505. def test_call_empty_group(self):
  506. x = group(app=self.app)
  507. assert not len(x())
  508. x.delay()
  509. x.apply_async()
  510. x()
  511. def test_skew(self):
  512. g = group([self.add.s(i, i) for i in range(10)])
  513. g.skew(start=1, stop=10, step=1)
  514. for i, task in enumerate(g.tasks):
  515. assert task.options['countdown'] == i + 1
  516. def test_iter(self):
  517. g = group([self.add.s(i, i) for i in range(10)])
  518. assert list(iter(g)) == g.tasks
  519. @staticmethod
  520. def helper_test_get_delay(result):
  521. import time
  522. t0 = time.time()
  523. while not result.ready():
  524. time.sleep(0.01)
  525. if time.time() - t0 > 1:
  526. return None
  527. return result.get()
  528. def test_kwargs_direct(self):
  529. res = [self.add(x=1, y=1), self.add(x=1, y=1)]
  530. assert res == [2, 2]
  531. def test_kwargs_apply(self):
  532. x = group([self.add.s(), self.add.s()])
  533. res = x.apply(kwargs=dict(x=1, y=1)).get()
  534. assert res == [2, 2]
  535. def test_kwargs_apply_async(self):
  536. self.app.conf.task_always_eager = True
  537. x = group([self.add.s(), self.add.s()])
  538. res = self.helper_test_get_delay(x.apply_async(kwargs=dict(x=1, y=1)))
  539. assert res == [2, 2]
  540. def test_kwargs_delay(self):
  541. self.app.conf.task_always_eager = True
  542. x = group([self.add.s(), self.add.s()])
  543. res = self.helper_test_get_delay(x.delay(x=1, y=1))
  544. assert res == [2, 2]
  545. def test_kwargs_delay_partial(self):
  546. self.app.conf.task_always_eager = True
  547. x = group([self.add.s(1), self.add.s(x=1)])
  548. res = self.helper_test_get_delay(x.delay(y=1))
  549. assert res == [2, 2]
  550. class test_chord(CanvasCase):
  551. def test_reverse(self):
  552. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  553. assert isinstance(signature(x), chord)
  554. assert isinstance(signature(dict(x)), chord)
  555. def test_clone_clones_body(self):
  556. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  557. y = x.clone()
  558. assert x.kwargs['body'] is not y.kwargs['body']
  559. y.kwargs.pop('body')
  560. z = y.clone()
  561. assert z.kwargs.get('body') is None
  562. def test_argument_is_group(self):
  563. x = chord(group(self.add.s(2, 2), self.add.s(4, 4), app=self.app))
  564. assert x.tasks
  565. def test_set_parent_id(self):
  566. x = chord(group(self.add.s(2, 2)))
  567. x.tasks = [self.add.s(2, 2)]
  568. x.set_parent_id('pid')
  569. def test_app_when_app(self):
  570. app = Mock(name='app')
  571. x = chord([self.add.s(4, 4)], app=app)
  572. assert x.app is app
  573. def test_app_when_app_in_task(self):
  574. t1 = Mock(name='t1')
  575. t2 = Mock(name='t2')
  576. x = chord([t1, self.add.s(4, 4)])
  577. assert x.app is x.tasks[0].app
  578. t1.app = None
  579. x = chord([t1], body=t2)
  580. assert x.app is t2._app
  581. @pytest.mark.usefixtures('depends_on_current_app')
  582. def test_app_fallback_to_current(self):
  583. from celery._state import current_app
  584. t1 = Mock(name='t1')
  585. t1.app = t1._app = None
  586. x = chord([t1], body=t1)
  587. assert x.app is current_app
  588. def test_set_immutable(self):
  589. x = chord([Mock(name='t1'), Mock(name='t2')], app=self.app)
  590. x.set_immutable(True)
  591. def test_links_to_body(self):
  592. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  593. x.link(self.div.s(2))
  594. assert not x.options.get('link')
  595. assert x.kwargs['body'].options['link']
  596. x.link_error(self.div.s(2))
  597. assert not x.options.get('link_error')
  598. assert x.kwargs['body'].options['link_error']
  599. assert x.tasks
  600. assert x.body
  601. def test_repr(self):
  602. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  603. assert repr(x)
  604. x.kwargs['body'] = None
  605. assert 'without body' in repr(x)
  606. def test_freeze_tasks_is_not_group(self):
  607. x = chord([self.add.s(2, 2)], body=self.add.s(), app=self.app)
  608. x.freeze()
  609. x.tasks = [self.add.s(2, 2)]
  610. x.freeze()
  611. class test_maybe_signature(CanvasCase):
  612. def test_is_None(self):
  613. assert maybe_signature(None, app=self.app) is None
  614. def test_is_dict(self):
  615. assert isinstance(maybe_signature(dict(self.add.s()), app=self.app),
  616. Signature)
  617. def test_when_sig(self):
  618. s = self.add.s()
  619. assert maybe_signature(s, app=self.app) is s