test_canvas.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import ContextMock, MagicMock, Mock
  4. from celery._state import _task_stack
  5. from celery.canvas import (
  6. Signature,
  7. chain,
  8. group,
  9. chord,
  10. signature,
  11. xmap,
  12. xstarmap,
  13. chunks,
  14. _maybe_group,
  15. maybe_signature,
  16. maybe_unroll_group,
  17. _seq_concat_seq,
  18. )
  19. from celery.result import EagerResult
  20. SIG = Signature({
  21. 'task': 'TASK',
  22. 'args': ('A1',),
  23. 'kwargs': {'K1': 'V1'},
  24. 'options': {'task_id': 'TASK_ID'},
  25. 'subtask_type': ''},
  26. )
  27. class test_maybe_unroll_group:
  28. def test_when_no_len_and_no_length_hint(self):
  29. g = MagicMock(name='group')
  30. g.tasks.__len__.side_effect = TypeError()
  31. g.tasks.__length_hint__ = Mock()
  32. g.tasks.__length_hint__.return_value = 0
  33. assert maybe_unroll_group(g) is g
  34. g.tasks.__length_hint__.side_effect = AttributeError()
  35. assert maybe_unroll_group(g) is g
  36. @pytest.mark.parametrize('a,b,expected', [
  37. ((1, 2, 3), [4, 5], (1, 2, 3, 4, 5)),
  38. ((1, 2), [3, 4, 5], [1, 2, 3, 4, 5]),
  39. ([1, 2, 3], (4, 5), [1, 2, 3, 4, 5]),
  40. ([1, 2], (3, 4, 5), (1, 2, 3, 4, 5)),
  41. ])
  42. def test_seq_concat_seq(a, b, expected):
  43. res = _seq_concat_seq(a, b)
  44. assert type(res) is type(expected) # noqa
  45. assert res == expected
  46. class CanvasCase:
  47. def setup(self):
  48. @self.app.task(shared=False)
  49. def add(x, y):
  50. return x + y
  51. self.add = add
  52. @self.app.task(shared=False)
  53. def mul(x, y):
  54. return x * y
  55. self.mul = mul
  56. @self.app.task(shared=False)
  57. def div(x, y):
  58. return x / y
  59. self.div = div
  60. class test_Signature(CanvasCase):
  61. def test_getitem_property_class(self):
  62. assert Signature.task
  63. assert Signature.args
  64. assert Signature.kwargs
  65. assert Signature.options
  66. assert Signature.subtask_type
  67. def test_getitem_property(self):
  68. assert SIG.task == 'TASK'
  69. assert SIG.args == ('A1',)
  70. assert SIG.kwargs == {'K1': 'V1'}
  71. assert SIG.options == {'task_id': 'TASK_ID'}
  72. assert SIG.subtask_type == ''
  73. def test_call(self):
  74. x = Signature('foo', (1, 2), {'arg1': 33}, app=self.app)
  75. x.type = Mock(name='type')
  76. x(3, 4, arg2=66)
  77. x.type.assert_called_with(3, 4, 1, 2, arg1=33, arg2=66)
  78. def test_link_on_scalar(self):
  79. x = Signature('TASK', link=Signature('B'))
  80. assert x.options['link']
  81. x.link(Signature('C'))
  82. assert isinstance(x.options['link'], list)
  83. assert Signature('B') in x.options['link']
  84. assert Signature('C') in x.options['link']
  85. def test_json(self):
  86. x = Signature('TASK', link=Signature('B', app=self.app), app=self.app)
  87. assert x.__json__() == dict(x)
  88. @pytest.mark.usefixtures('depends_on_current_app')
  89. def test_reduce(self):
  90. x = Signature('TASK', (2, 4), app=self.app)
  91. fun, args = x.__reduce__()
  92. assert fun(*args) == x
  93. def test_replace(self):
  94. x = Signature('TASK', ('A'), {})
  95. assert x.replace(args=('B',)).args == ('B',)
  96. assert x.replace(kwargs={'FOO': 'BAR'}).kwargs == {
  97. 'FOO': 'BAR',
  98. }
  99. assert x.replace(options={'task_id': '123'}).options == {
  100. 'task_id': '123',
  101. }
  102. def test_set(self):
  103. assert Signature('TASK', x=1).set(task_id='2').options == {
  104. 'x': 1, 'task_id': '2',
  105. }
  106. def test_link(self):
  107. x = signature(SIG)
  108. x.link(SIG)
  109. x.link(SIG)
  110. assert SIG in x.options['link']
  111. assert len(x.options['link']) == 1
  112. def test_link_error(self):
  113. x = signature(SIG)
  114. x.link_error(SIG)
  115. x.link_error(SIG)
  116. assert SIG in x.options['link_error']
  117. assert len(x.options['link_error']) == 1
  118. def test_flatten_links(self):
  119. tasks = [self.add.s(2, 2), self.mul.s(4), self.div.s(2)]
  120. tasks[0].link(tasks[1])
  121. tasks[1].link(tasks[2])
  122. assert tasks[0].flatten_links() == tasks
  123. def test_OR(self):
  124. x = self.add.s(2, 2) | self.mul.s(4)
  125. assert isinstance(x, chain)
  126. y = self.add.s(4, 4) | self.div.s(2)
  127. z = x | y
  128. assert isinstance(y, chain)
  129. assert isinstance(z, chain)
  130. assert len(z.tasks) == 4
  131. with pytest.raises(TypeError):
  132. x | 10
  133. ax = self.add.s(2, 2) | (self.add.s(4) | self.add.s(8))
  134. assert isinstance(ax, chain)
  135. assert len(ax.tasks), 3 == 'consolidates chain to chain'
  136. def test_INVERT(self):
  137. x = self.add.s(2, 2)
  138. x.apply_async = Mock()
  139. x.apply_async.return_value = Mock()
  140. x.apply_async.return_value.get = Mock()
  141. x.apply_async.return_value.get.return_value = 4
  142. assert ~x == 4
  143. x.apply_async.assert_called()
  144. def test_merge_immutable(self):
  145. x = self.add.si(2, 2, foo=1)
  146. args, kwargs, options = x._merge((4,), {'bar': 2}, {'task_id': 3})
  147. assert args == (2, 2)
  148. assert kwargs == {'foo': 1}
  149. assert options == {'task_id': 3}
  150. def test_set_immutable(self):
  151. x = self.add.s(2, 2)
  152. assert not x.immutable
  153. x.set(immutable=True)
  154. assert x.immutable
  155. x.set(immutable=False)
  156. assert not x.immutable
  157. def test_election(self):
  158. x = self.add.s(2, 2)
  159. x.freeze('foo')
  160. x.type.app.control = Mock()
  161. r = x.election()
  162. x.type.app.control.election.assert_called()
  163. assert r.id == 'foo'
  164. def test_AsyncResult_when_not_registered(self):
  165. s = signature('xxx.not.registered', app=self.app)
  166. assert s.AsyncResult
  167. def test_apply_async_when_not_registered(self):
  168. s = signature('xxx.not.registered', app=self.app)
  169. assert s._apply_async
  170. class test_xmap_xstarmap(CanvasCase):
  171. def test_apply(self):
  172. for type, attr in [(xmap, 'map'), (xstarmap, 'starmap')]:
  173. args = [(i, i) for i in range(10)]
  174. s = getattr(self.add, attr)(args)
  175. s.type = Mock()
  176. s.apply_async(foo=1)
  177. s.type.apply_async.assert_called_with(
  178. (), {'task': self.add.s(), 'it': args}, foo=1,
  179. route_name=self.add.name,
  180. )
  181. assert type.from_dict(dict(s)) == s
  182. assert repr(s)
  183. class test_chunks(CanvasCase):
  184. def test_chunks(self):
  185. x = self.add.chunks(range(100), 10)
  186. assert dict(chunks.from_dict(dict(x), app=self.app)) == dict(x)
  187. assert x.group()
  188. assert len(x.group().tasks) == 10
  189. x.group = Mock()
  190. gr = x.group.return_value = Mock()
  191. x.apply_async()
  192. gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
  193. gr.apply_async.reset_mock()
  194. x()
  195. gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
  196. self.app.conf.task_always_eager = True
  197. chunks.apply_chunks(app=self.app, **x['kwargs'])
  198. class test_chain(CanvasCase):
  199. def test_clone_preserves_state(self):
  200. x = chain(self.add.s(i, i) for i in range(10))
  201. assert x.clone().tasks == x.tasks
  202. assert x.clone().kwargs == x.kwargs
  203. assert x.clone().args == x.args
  204. def test_repr(self):
  205. x = self.add.s(2, 2) | self.add.s(2)
  206. assert repr(x) == '%s(2, 2) | %s(2)' % (self.add.name, self.add.name)
  207. def test_apply_async(self):
  208. c = self.add.s(2, 2) | self.add.s(4) | self.add.s(8)
  209. result = c.apply_async()
  210. assert result.parent
  211. assert result.parent.parent
  212. assert result.parent.parent.parent is None
  213. def test_group_to_chord__freeze_parent_id(self):
  214. def using_freeze(c):
  215. c.freeze(parent_id='foo', root_id='root')
  216. return c._frozen[0]
  217. self.assert_group_to_chord_parent_ids(using_freeze)
  218. def assert_group_to_chord_parent_ids(self, freezefun):
  219. c = (
  220. self.add.s(5, 5) |
  221. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  222. self.add.si(10, 10) |
  223. self.add.si(20, 20) |
  224. self.add.si(30, 30)
  225. )
  226. tasks = freezefun(c)
  227. assert tasks[-1].parent_id == 'foo'
  228. assert tasks[-1].root_id == 'root'
  229. assert tasks[-2].parent_id == tasks[-1].id
  230. assert tasks[-2].root_id == 'root'
  231. assert tasks[-2].body.parent_id == tasks[-2].tasks.id
  232. assert tasks[-2].body.parent_id == tasks[-2].id
  233. assert tasks[-2].body.root_id == 'root'
  234. assert tasks[-2].tasks.tasks[0].parent_id == tasks[-1].id
  235. assert tasks[-2].tasks.tasks[0].root_id == 'root'
  236. assert tasks[-2].tasks.tasks[1].parent_id == tasks[-1].id
  237. assert tasks[-2].tasks.tasks[1].root_id == 'root'
  238. assert tasks[-2].tasks.tasks[2].parent_id == tasks[-1].id
  239. assert tasks[-2].tasks.tasks[2].root_id == 'root'
  240. assert tasks[-2].tasks.tasks[3].parent_id == tasks[-1].id
  241. assert tasks[-2].tasks.tasks[3].root_id == 'root'
  242. assert tasks[-2].tasks.tasks[4].parent_id == tasks[-1].id
  243. assert tasks[-2].tasks.tasks[4].root_id == 'root'
  244. assert tasks[-3].parent_id == tasks[-2].body.id
  245. assert tasks[-3].root_id == 'root'
  246. assert tasks[-4].parent_id == tasks[-3].id
  247. assert tasks[-4].root_id == 'root'
  248. def test_splices_chains(self):
  249. c = chain(
  250. self.add.s(5, 5),
  251. chain(self.add.s(6), self.add.s(7), self.add.s(8), app=self.app),
  252. app=self.app,
  253. )
  254. c.freeze()
  255. tasks, _ = c._frozen
  256. assert len(tasks) == 4
  257. def test_from_dict_no_tasks(self):
  258. assert chain.from_dict(dict(chain(app=self.app)), app=self.app)
  259. @pytest.mark.usefixtures('depends_on_current_app')
  260. def test_app_falls_back_to_default(self):
  261. from celery._state import current_app
  262. assert chain().app is current_app
  263. def test_handles_dicts(self):
  264. c = chain(
  265. self.add.s(5, 5), dict(self.add.s(8)), app=self.app,
  266. )
  267. c.freeze()
  268. tasks, _ = c._frozen
  269. for task in tasks:
  270. assert isinstance(task, Signature)
  271. assert task.app is self.app
  272. def test_group_to_chord(self):
  273. c = (
  274. self.add.s(5) |
  275. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  276. self.add.s(10) |
  277. self.add.s(20) |
  278. self.add.s(30)
  279. )
  280. c._use_link = True
  281. tasks, results = c.prepare_steps((), c.tasks)
  282. assert tasks[-1].args[0] == 5
  283. assert isinstance(tasks[-2], chord)
  284. assert len(tasks[-2].tasks) == 5
  285. assert tasks[-2].parent_id == tasks[-1].id
  286. assert tasks[-2].root_id == tasks[-1].id
  287. assert tasks[-2].body.args[0] == 10
  288. assert tasks[-2].body.parent_id == tasks[-2].id
  289. assert tasks[-3].args[0] == 20
  290. assert tasks[-3].root_id == tasks[-1].id
  291. assert tasks[-3].parent_id == tasks[-2].body.id
  292. assert tasks[-4].args[0] == 30
  293. assert tasks[-4].parent_id == tasks[-3].id
  294. assert tasks[-4].root_id == tasks[-1].id
  295. assert tasks[-2].body.options['link']
  296. assert tasks[-2].body.options['link'][0].options['link']
  297. c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
  298. c2._use_link = True
  299. tasks2, _ = c2.prepare_steps((), c2.tasks)
  300. assert isinstance(tasks2[0], group)
  301. def test_group_to_chord__protocol_2__or(self):
  302. c = (
  303. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  304. self.add.s(10) |
  305. self.add.s(20) |
  306. self.add.s(30)
  307. )
  308. assert isinstance(c, chord)
  309. def test_group_to_chord__protocol_2(self):
  310. c = chain(
  311. group([self.add.s(i, i) for i in range(5)], app=self.app),
  312. self.add.s(10),
  313. self.add.s(20),
  314. self.add.s(30)
  315. )
  316. c._use_link = False
  317. tasks, _ = c.prepare_steps((), c.tasks)
  318. assert isinstance(tasks[-1], chord)
  319. c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
  320. c2._use_link = False
  321. tasks2, _ = c2.prepare_steps((), c2.tasks)
  322. assert isinstance(tasks2[0], group)
  323. def test_apply_options(self):
  324. class static(Signature):
  325. def clone(self, *args, **kwargs):
  326. return self
  327. def s(*args, **kwargs):
  328. return static(self.add, args, kwargs, type=self.add, app=self.app)
  329. c = s(2, 2) | s(4) | s(8)
  330. r1 = c.apply_async(task_id='some_id')
  331. assert r1.id == 'some_id'
  332. c.apply_async(group_id='some_group_id')
  333. assert c.tasks[-1].options['group_id'] == 'some_group_id'
  334. c.apply_async(chord='some_chord_id')
  335. assert c.tasks[-1].options['chord'] == 'some_chord_id'
  336. c.apply_async(link=[s(32)])
  337. assert c.tasks[-1].options['link'] == [s(32)]
  338. c.apply_async(link_error=[s('error')])
  339. for task in c.tasks:
  340. assert task.options['link_error'] == [s('error')]
  341. def test_reverse(self):
  342. x = self.add.s(2, 2) | self.add.s(2)
  343. assert isinstance(signature(x), chain)
  344. assert isinstance(signature(dict(x)), chain)
  345. def test_always_eager(self):
  346. self.app.conf.task_always_eager = True
  347. assert ~(self.add.s(4, 4) | self.add.s(8)) == 16
  348. def test_apply(self):
  349. x = chain(self.add.s(4, 4), self.add.s(8), self.add.s(10))
  350. res = x.apply()
  351. assert isinstance(res, EagerResult)
  352. assert res.get() == 26
  353. assert res.parent.get() == 16
  354. assert res.parent.parent.get() == 8
  355. assert res.parent.parent.parent is None
  356. def test_empty_chain_returns_none(self):
  357. assert chain(app=self.app)() is None
  358. assert chain(app=self.app).apply_async() is None
  359. def test_root_id_parent_id(self):
  360. self.app.conf.task_protocol = 2
  361. c = chain(self.add.si(i, i) for i in range(4))
  362. c.freeze()
  363. tasks, _ = c._frozen
  364. for i, task in enumerate(tasks):
  365. assert task.root_id == tasks[-1].id
  366. try:
  367. assert task.parent_id == tasks[i + 1].id
  368. except IndexError:
  369. assert i == len(tasks) - 1
  370. else:
  371. valid_parents = i
  372. assert valid_parents == len(tasks) - 2
  373. self.assert_sent_with_ids(tasks[-1], tasks[-1].id, 'foo',
  374. parent_id='foo')
  375. assert tasks[-2].options['parent_id']
  376. self.assert_sent_with_ids(tasks[-2], tasks[-1].id, tasks[-1].id)
  377. self.assert_sent_with_ids(tasks[-3], tasks[-1].id, tasks[-2].id)
  378. self.assert_sent_with_ids(tasks[-4], tasks[-1].id, tasks[-3].id)
  379. def assert_sent_with_ids(self, task, rid, pid, **options):
  380. self.app.amqp.send_task_message = Mock(name='send_task_message')
  381. self.app.backend = Mock()
  382. self.app.producer_or_acquire = ContextMock()
  383. task.apply_async(**options)
  384. self.app.amqp.send_task_message.assert_called()
  385. message = self.app.amqp.send_task_message.call_args[0][2]
  386. assert message.headers['parent_id'] == pid
  387. assert message.headers['root_id'] == rid
  388. def test_call_no_tasks(self):
  389. x = chain()
  390. assert not x()
  391. def test_call_with_tasks(self):
  392. x = self.add.s(2, 2) | self.add.s(4)
  393. x.apply_async = Mock()
  394. x(2, 2, foo=1)
  395. x.apply_async.assert_called_with((2, 2), {'foo': 1})
  396. def test_from_dict_no_args__with_args(self):
  397. x = dict(self.add.s(2, 2) | self.add.s(4))
  398. x['args'] = None
  399. assert isinstance(chain.from_dict(x), chain)
  400. x['args'] = (2,)
  401. assert isinstance(chain.from_dict(x), chain)
  402. def test_accepts_generator_argument(self):
  403. x = chain(self.add.s(i) for i in range(10))
  404. assert x.tasks[0].type, self.add
  405. assert x.type
  406. class test_group(CanvasCase):
  407. def test_repr(self):
  408. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  409. assert repr(x)
  410. def test_reverse(self):
  411. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  412. assert isinstance(signature(x), group)
  413. assert isinstance(signature(dict(x)), group)
  414. def test_cannot_link_on_group(self):
  415. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  416. with pytest.raises(TypeError):
  417. x.apply_async(link=self.add.s(2, 2))
  418. def test_cannot_link_error_on_group(self):
  419. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  420. with pytest.raises(TypeError):
  421. x.apply_async(link_error=self.add.s(2, 2))
  422. def test_group_with_group_argument(self):
  423. g1 = group(self.add.s(2, 2), self.add.s(4, 4), app=self.app)
  424. g2 = group(g1, app=self.app)
  425. assert g2.tasks is g1.tasks
  426. def test_maybe_group_sig(self):
  427. assert _maybe_group(self.add.s(2, 2), self.app) == [self.add.s(2, 2)]
  428. def test_apply(self):
  429. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  430. res = x.apply()
  431. assert res.get(), [8 == 16]
  432. def test_apply_async(self):
  433. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  434. x.apply_async()
  435. def test_prepare_with_dict(self):
  436. x = group([self.add.s(4, 4), dict(self.add.s(8, 8))], app=self.app)
  437. x.apply_async()
  438. def test_group_in_group(self):
  439. g1 = group(self.add.s(2, 2), self.add.s(4, 4), app=self.app)
  440. g2 = group(self.add.s(8, 8), g1, self.add.s(16, 16), app=self.app)
  441. g2.apply_async()
  442. def test_set_immutable(self):
  443. g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
  444. g1.set_immutable(True)
  445. for task in g1.tasks:
  446. task.set_immutable.assert_called_with(True)
  447. def test_link(self):
  448. g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
  449. sig = Mock(name='sig')
  450. g1.link(sig)
  451. g1.tasks[0].link.assert_called_with(sig.clone().set(immutable=True))
  452. def test_link_error(self):
  453. g1 = group(Mock(name='t1'), Mock(name='t2'), app=self.app)
  454. sig = Mock(name='sig')
  455. g1.link_error(sig)
  456. g1.tasks[0].link_error.assert_called_with(
  457. sig.clone().set(immutable=True),
  458. )
  459. def test_apply_empty(self):
  460. x = group(app=self.app)
  461. x.apply()
  462. res = x.apply_async()
  463. assert res
  464. assert not res.results
  465. def test_apply_async_with_parent(self):
  466. _task_stack.push(self.add)
  467. try:
  468. self.add.push_request(called_directly=False)
  469. try:
  470. assert not self.add.request.children
  471. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  472. res = x()
  473. assert self.add.request.children
  474. assert res in self.add.request.children
  475. assert len(self.add.request.children) == 1
  476. finally:
  477. self.add.pop_request()
  478. finally:
  479. _task_stack.pop()
  480. def test_from_dict(self):
  481. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  482. x['args'] = (2, 2)
  483. assert group.from_dict(dict(x))
  484. x['args'] = None
  485. assert group.from_dict(dict(x))
  486. def test_call_empty_group(self):
  487. x = group(app=self.app)
  488. assert not len(x())
  489. x.delay()
  490. x.apply_async()
  491. x()
  492. def test_skew(self):
  493. g = group([self.add.s(i, i) for i in range(10)])
  494. g.skew(start=1, stop=10, step=1)
  495. for i, task in enumerate(g.tasks):
  496. assert task.options['countdown'] == i + 1
  497. def test_iter(self):
  498. g = group([self.add.s(i, i) for i in range(10)])
  499. assert list(iter(g)) == g.tasks
  500. @staticmethod
  501. def helper_test_get_delay(result):
  502. import time
  503. t0 = time.time()
  504. while not result.ready():
  505. time.sleep(0.01)
  506. if time.time() - t0 > 1:
  507. return None
  508. return result.get()
  509. def test_kwargs_direct(self):
  510. res = [self.add(x=1, y=1), self.add(x=1, y=1)]
  511. assert res == [2, 2]
  512. def test_kwargs_apply(self):
  513. x = group([self.add.s(), self.add.s()])
  514. res = x.apply(kwargs=dict(x=1, y=1)).get()
  515. assert res == [2, 2]
  516. def test_kwargs_apply_async(self):
  517. self.app.conf.task_always_eager = True
  518. x = group([self.add.s(), self.add.s()])
  519. res = self.helper_test_get_delay(x.apply_async(kwargs=dict(x=1, y=1)))
  520. assert res == [2, 2]
  521. def test_kwargs_delay(self):
  522. self.app.conf.task_always_eager = True
  523. x = group([self.add.s(), self.add.s()])
  524. res = self.helper_test_get_delay(x.delay(x=1, y=1))
  525. assert res == [2, 2]
  526. def test_kwargs_delay_partial(self):
  527. self.app.conf.task_always_eager = True
  528. x = group([self.add.s(1), self.add.s(x=1)])
  529. res = self.helper_test_get_delay(x.delay(y=1))
  530. assert res == [2, 2]
  531. class test_chord(CanvasCase):
  532. def test_reverse(self):
  533. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  534. assert isinstance(signature(x), chord)
  535. assert isinstance(signature(dict(x)), chord)
  536. def test_clone_clones_body(self):
  537. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  538. y = x.clone()
  539. assert x.kwargs['body'] is not y.kwargs['body']
  540. y.kwargs.pop('body')
  541. z = y.clone()
  542. assert z.kwargs.get('body') is None
  543. def test_argument_is_group(self):
  544. x = chord(group(self.add.s(2, 2), self.add.s(4, 4), app=self.app))
  545. assert x.tasks
  546. def test_set_parent_id(self):
  547. x = chord(group(self.add.s(2, 2)))
  548. x.tasks = [self.add.s(2, 2)]
  549. x.set_parent_id('pid')
  550. def test_app_when_app(self):
  551. app = Mock(name='app')
  552. x = chord([self.add.s(4, 4)], app=app)
  553. assert x.app is app
  554. def test_app_when_app_in_task(self):
  555. t1 = Mock(name='t1')
  556. t2 = Mock(name='t2')
  557. x = chord([t1, self.add.s(4, 4)])
  558. assert x.app is x.tasks[0].app
  559. t1.app = None
  560. x = chord([t1], body=t2)
  561. assert x.app is t2._app
  562. @pytest.mark.usefixtures('depends_on_current_app')
  563. def test_app_fallback_to_current(self):
  564. from celery._state import current_app
  565. t1 = Mock(name='t1')
  566. t1.app = t1._app = None
  567. x = chord([t1], body=t1)
  568. assert x.app is current_app
  569. def test_set_immutable(self):
  570. x = chord([Mock(name='t1'), Mock(name='t2')], app=self.app)
  571. x.set_immutable(True)
  572. def test_links_to_body(self):
  573. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  574. x.link(self.div.s(2))
  575. assert not x.options.get('link')
  576. assert x.kwargs['body'].options['link']
  577. x.link_error(self.div.s(2))
  578. assert not x.options.get('link_error')
  579. assert x.kwargs['body'].options['link_error']
  580. assert x.tasks
  581. assert x.body
  582. def test_repr(self):
  583. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  584. assert repr(x)
  585. x.kwargs['body'] = None
  586. assert 'without body' in repr(x)
  587. def test_freeze_tasks_is_not_group(self):
  588. x = chord([self.add.s(2, 2)], body=self.add.s(), app=self.app)
  589. x.freeze()
  590. x.tasks = [self.add.s(2, 2)]
  591. x.freeze()
  592. class test_maybe_signature(CanvasCase):
  593. def test_is_None(self):
  594. assert maybe_signature(None, app=self.app) is None
  595. def test_is_dict(self):
  596. assert isinstance(maybe_signature(dict(self.add.s()), app=self.app),
  597. Signature)
  598. def test_when_sig(self):
  599. s = self.add.s()
  600. assert maybe_signature(s, app=self.app) is s