test_canvas.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. from __future__ import absolute_import
  2. from celery.canvas import (
  3. Signature,
  4. chain,
  5. group,
  6. chord,
  7. signature,
  8. xmap,
  9. xstarmap,
  10. chunks,
  11. _maybe_group,
  12. maybe_signature,
  13. )
  14. from celery.result import EagerResult
  15. from celery.tests.case import AppCase, Mock
  16. SIG = Signature({'task': 'TASK',
  17. 'args': ('A1', ),
  18. 'kwargs': {'K1': 'V1'},
  19. 'options': {'task_id': 'TASK_ID'},
  20. 'subtask_type': ''})
  21. class CanvasCase(AppCase):
  22. def setup(self):
  23. @self.app.task(shared=False)
  24. def add(x, y):
  25. return x + y
  26. self.add = add
  27. @self.app.task(shared=False)
  28. def mul(x, y):
  29. return x * y
  30. self.mul = mul
  31. @self.app.task(shared=False)
  32. def div(x, y):
  33. return x / y
  34. self.div = div
  35. class test_Signature(CanvasCase):
  36. def test_getitem_property_class(self):
  37. self.assertTrue(Signature.task)
  38. self.assertTrue(Signature.args)
  39. self.assertTrue(Signature.kwargs)
  40. self.assertTrue(Signature.options)
  41. self.assertTrue(Signature.subtask_type)
  42. def test_getitem_property(self):
  43. self.assertEqual(SIG.task, 'TASK')
  44. self.assertEqual(SIG.args, ('A1', ))
  45. self.assertEqual(SIG.kwargs, {'K1': 'V1'})
  46. self.assertEqual(SIG.options, {'task_id': 'TASK_ID'})
  47. self.assertEqual(SIG.subtask_type, '')
  48. def test_link_on_scalar(self):
  49. x = Signature('TASK', link=Signature('B'))
  50. self.assertTrue(x.options['link'])
  51. x.link(Signature('C'))
  52. self.assertIsInstance(x.options['link'], list)
  53. self.assertIn(Signature('B'), x.options['link'])
  54. self.assertIn(Signature('C'), x.options['link'])
  55. def test_replace(self):
  56. x = Signature('TASK', ('A'), {})
  57. self.assertTupleEqual(x.replace(args=('B', )).args, ('B', ))
  58. self.assertDictEqual(
  59. x.replace(kwargs={'FOO': 'BAR'}).kwargs,
  60. {'FOO': 'BAR'},
  61. )
  62. self.assertDictEqual(
  63. x.replace(options={'task_id': '123'}).options,
  64. {'task_id': '123'},
  65. )
  66. def test_set(self):
  67. self.assertDictEqual(
  68. Signature('TASK', x=1).set(task_id='2').options,
  69. {'x': 1, 'task_id': '2'},
  70. )
  71. def test_link(self):
  72. x = signature(SIG)
  73. x.link(SIG)
  74. x.link(SIG)
  75. self.assertIn(SIG, x.options['link'])
  76. self.assertEqual(len(x.options['link']), 1)
  77. def test_link_error(self):
  78. x = signature(SIG)
  79. x.link_error(SIG)
  80. x.link_error(SIG)
  81. self.assertIn(SIG, x.options['link_error'])
  82. self.assertEqual(len(x.options['link_error']), 1)
  83. def test_flatten_links(self):
  84. tasks = [self.add.s(2, 2), self.mul.s(4), self.div.s(2)]
  85. tasks[0].link(tasks[1])
  86. tasks[1].link(tasks[2])
  87. self.assertEqual(tasks[0].flatten_links(), tasks)
  88. def test_OR(self):
  89. x = self.add.s(2, 2) | self.mul.s(4)
  90. self.assertIsInstance(x, chain)
  91. y = self.add.s(4, 4) | self.div.s(2)
  92. z = x | y
  93. self.assertIsInstance(y, chain)
  94. self.assertIsInstance(z, chain)
  95. self.assertEqual(len(z.tasks), 4)
  96. with self.assertRaises(TypeError):
  97. x | 10
  98. ax = self.add.s(2, 2) | (self.add.s(4) | self.add.s(8))
  99. self.assertIsInstance(ax, chain)
  100. self.assertEqual(len(ax.tasks), 3, 'consolidates chain to chain')
  101. def test_INVERT(self):
  102. x = self.add.s(2, 2)
  103. x.apply_async = Mock()
  104. x.apply_async.return_value = Mock()
  105. x.apply_async.return_value.get = Mock()
  106. x.apply_async.return_value.get.return_value = 4
  107. self.assertEqual(~x, 4)
  108. self.assertTrue(x.apply_async.called)
  109. def test_merge_immutable(self):
  110. x = self.add.si(2, 2, foo=1)
  111. args, kwargs, options = x._merge((4, ), {'bar': 2}, {'task_id': 3})
  112. self.assertTupleEqual(args, (2, 2))
  113. self.assertDictEqual(kwargs, {'foo': 1})
  114. self.assertDictEqual(options, {'task_id': 3})
  115. def test_set_immutable(self):
  116. x = self.add.s(2, 2)
  117. self.assertFalse(x.immutable)
  118. x.set(immutable=True)
  119. self.assertTrue(x.immutable)
  120. x.set(immutable=False)
  121. self.assertFalse(x.immutable)
  122. def test_election(self):
  123. x = self.add.s(2, 2)
  124. x.freeze('foo')
  125. x.type.app.control = Mock()
  126. r = x.election()
  127. self.assertTrue(x.type.app.control.election.called)
  128. self.assertEqual(r.id, 'foo')
  129. def test_AsyncResult_when_not_registered(self):
  130. s = signature('xxx.not.registered', app=self.app)
  131. self.assertTrue(s.AsyncResult)
  132. def test_apply_async_when_not_registered(self):
  133. s = signature('xxx.not.registered', app=self.app)
  134. self.assertTrue(s._apply_async)
  135. class test_xmap_xstarmap(CanvasCase):
  136. def test_apply(self):
  137. for type, attr in [(xmap, 'map'), (xstarmap, 'starmap')]:
  138. args = [(i, i) for i in range(10)]
  139. s = getattr(self.add, attr)(args)
  140. s.type = Mock()
  141. s.apply_async(foo=1)
  142. s.type.apply_async.assert_called_with(
  143. (), {'task': self.add.s(), 'it': args}, foo=1,
  144. route_name=self.add.name,
  145. )
  146. self.assertEqual(type.from_dict(dict(s)), s)
  147. self.assertTrue(repr(s))
  148. class test_chunks(CanvasCase):
  149. def test_chunks(self):
  150. x = self.add.chunks(range(100), 10)
  151. self.assertEqual(
  152. dict(chunks.from_dict(dict(x), app=self.app)), dict(x),
  153. )
  154. self.assertTrue(x.group())
  155. self.assertEqual(len(x.group().tasks), 10)
  156. x.group = Mock()
  157. gr = x.group.return_value = Mock()
  158. x.apply_async()
  159. gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
  160. gr.apply_async.reset_mock()
  161. x()
  162. gr.apply_async.assert_called_with((), {}, route_name=self.add.name)
  163. self.app.conf.CELERY_ALWAYS_EAGER = True
  164. chunks.apply_chunks(app=self.app, **x['kwargs'])
  165. class test_chain(CanvasCase):
  166. def test_repr(self):
  167. x = self.add.s(2, 2) | self.add.s(2)
  168. self.assertEqual(
  169. repr(x), '%s(2, 2) | %s(2)' % (self.add.name, self.add.name),
  170. )
  171. def test_reverse(self):
  172. x = self.add.s(2, 2) | self.add.s(2)
  173. self.assertIsInstance(signature(x), chain)
  174. self.assertIsInstance(signature(dict(x)), chain)
  175. def test_always_eager(self):
  176. self.app.conf.CELERY_ALWAYS_EAGER = True
  177. self.assertEqual(~(self.add.s(4, 4) | self.add.s(8)), 16)
  178. def test_apply(self):
  179. x = chain(self.add.s(4, 4), self.add.s(8), self.add.s(10))
  180. res = x.apply()
  181. self.assertIsInstance(res, EagerResult)
  182. self.assertEqual(res.get(), 26)
  183. self.assertEqual(res.parent.get(), 16)
  184. self.assertEqual(res.parent.parent.get(), 8)
  185. self.assertIsNone(res.parent.parent.parent)
  186. def test_empty_chain_returns_none(self):
  187. self.assertIsNone(chain(app=self.app)())
  188. self.assertIsNone(chain(app=self.app).apply_async())
  189. def test_call_no_tasks(self):
  190. x = chain()
  191. self.assertFalse(x())
  192. def test_call_with_tasks(self):
  193. x = self.add.s(2, 2) | self.add.s(4)
  194. x.apply_async = Mock()
  195. x(2, 2, foo=1)
  196. x.apply_async.assert_called_with((2, 2), {'foo': 1})
  197. def test_from_dict_no_args__with_args(self):
  198. x = dict(self.add.s(2, 2) | self.add.s(4))
  199. x['args'] = None
  200. self.assertIsInstance(chain.from_dict(x), chain)
  201. x['args'] = (2, )
  202. self.assertIsInstance(chain.from_dict(x), chain)
  203. def test_accepts_generator_argument(self):
  204. x = chain(self.add.s(i) for i in range(10))
  205. self.assertTrue(x.tasks[0].type, self.add)
  206. self.assertTrue(x.type)
  207. class test_group(CanvasCase):
  208. def test_repr(self):
  209. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  210. self.assertEqual(repr(x), repr(x.tasks))
  211. def test_reverse(self):
  212. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  213. self.assertIsInstance(signature(x), group)
  214. self.assertIsInstance(signature(dict(x)), group)
  215. def test_maybe_group_sig(self):
  216. self.assertListEqual(
  217. _maybe_group(self.add.s(2, 2)), [self.add.s(2, 2)],
  218. )
  219. def test_from_dict(self):
  220. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  221. x['args'] = (2, 2)
  222. self.assertTrue(group.from_dict(dict(x)))
  223. x['args'] = None
  224. self.assertTrue(group.from_dict(dict(x)))
  225. def test_call_empty_group(self):
  226. x = group(app=self.app)
  227. self.assertFalse(len(x()))
  228. x.delay()
  229. x.apply_async()
  230. x()
  231. def test_skew(self):
  232. g = group([self.add.s(i, i) for i in range(10)])
  233. g.skew(start=1, stop=10, step=1)
  234. for i, task in enumerate(g.tasks):
  235. self.assertEqual(task.options['countdown'], i + 1)
  236. def test_iter(self):
  237. g = group([self.add.s(i, i) for i in range(10)])
  238. self.assertListEqual(list(iter(g)), g.tasks)
  239. class test_chord(CanvasCase):
  240. def test_reverse(self):
  241. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  242. self.assertIsInstance(signature(x), chord)
  243. self.assertIsInstance(signature(dict(x)), chord)
  244. def test_clone_clones_body(self):
  245. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  246. y = x.clone()
  247. self.assertIsNot(x.kwargs['body'], y.kwargs['body'])
  248. y.kwargs.pop('body')
  249. z = y.clone()
  250. self.assertIsNone(z.kwargs.get('body'))
  251. def test_links_to_body(self):
  252. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  253. x.link(self.div.s(2))
  254. self.assertFalse(x.options.get('link'))
  255. self.assertTrue(x.kwargs['body'].options['link'])
  256. x.link_error(self.div.s(2))
  257. self.assertFalse(x.options.get('link_error'))
  258. self.assertTrue(x.kwargs['body'].options['link_error'])
  259. self.assertTrue(x.tasks)
  260. self.assertTrue(x.body)
  261. def test_repr(self):
  262. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  263. self.assertTrue(repr(x))
  264. x.kwargs['body'] = None
  265. self.assertIn('without body', repr(x))
  266. class test_maybe_signature(CanvasCase):
  267. def test_is_None(self):
  268. self.assertIsNone(maybe_signature(None, app=self.app))
  269. def test_is_dict(self):
  270. self.assertIsInstance(
  271. maybe_signature(dict(self.add.s()), app=self.app), Signature,
  272. )
  273. def test_when_sig(self):
  274. s = self.add.s()
  275. self.assertIs(maybe_signature(s, app=self.app), s)