test_canvas.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. from __future__ import absolute_import
  2. from mock import Mock
  3. from celery.canvas import (
  4. Signature,
  5. chain,
  6. group,
  7. chord,
  8. signature,
  9. xmap,
  10. xstarmap,
  11. chunks,
  12. _maybe_group,
  13. maybe_signature,
  14. )
  15. from celery.result import EagerResult
  16. from celery.tests.case import AppCase
  17. SIG = Signature({'task': 'TASK',
  18. 'args': ('A1', ),
  19. 'kwargs': {'K1': 'V1'},
  20. 'options': {'task_id': 'TASK_ID'},
  21. 'subtask_type': ''})
  22. class CanvasCase(AppCase):
  23. def setup(self):
  24. @self.app.task(shared=False)
  25. def add(x, y):
  26. return x + y
  27. self.add = add
  28. @self.app.task(shared=False)
  29. def mul(x, y):
  30. return x * y
  31. self.mul = mul
  32. @self.app.task(shared=False)
  33. def div(x, y):
  34. return x / y
  35. self.div = div
  36. class test_Signature(CanvasCase):
  37. def test_getitem_property_class(self):
  38. self.assertTrue(Signature.task)
  39. self.assertTrue(Signature.args)
  40. self.assertTrue(Signature.kwargs)
  41. self.assertTrue(Signature.options)
  42. self.assertTrue(Signature.subtask_type)
  43. def test_getitem_property(self):
  44. self.assertEqual(SIG.task, 'TASK')
  45. self.assertEqual(SIG.args, ('A1', ))
  46. self.assertEqual(SIG.kwargs, {'K1': 'V1'})
  47. self.assertEqual(SIG.options, {'task_id': 'TASK_ID'})
  48. self.assertEqual(SIG.subtask_type, '')
  49. def test_replace(self):
  50. x = Signature('TASK', ('A'), {})
  51. self.assertTupleEqual(x.replace(args=('B', )).args, ('B', ))
  52. self.assertDictEqual(
  53. x.replace(kwargs={'FOO': 'BAR'}).kwargs,
  54. {'FOO': 'BAR'},
  55. )
  56. self.assertDictEqual(
  57. x.replace(options={'task_id': '123'}).options,
  58. {'task_id': '123'},
  59. )
  60. def test_set(self):
  61. self.assertDictEqual(
  62. Signature('TASK', x=1).set(task_id='2').options,
  63. {'x': 1, 'task_id': '2'},
  64. )
  65. def test_link(self):
  66. x = signature(SIG)
  67. x.link(SIG)
  68. x.link(SIG)
  69. self.assertIn(SIG, x.options['link'])
  70. self.assertEqual(len(x.options['link']), 1)
  71. def test_link_error(self):
  72. x = signature(SIG)
  73. x.link_error(SIG)
  74. x.link_error(SIG)
  75. self.assertIn(SIG, x.options['link_error'])
  76. self.assertEqual(len(x.options['link_error']), 1)
  77. def test_flatten_links(self):
  78. tasks = [self.add.s(2, 2), self.mul.s(4), self.div.s(2)]
  79. tasks[0].link(tasks[1])
  80. tasks[1].link(tasks[2])
  81. self.assertEqual(tasks[0].flatten_links(), tasks)
  82. def test_OR(self):
  83. x = self.add.s(2, 2) | self.mul.s(4)
  84. self.assertIsInstance(x, chain)
  85. y = self.add.s(4, 4) | self.div.s(2)
  86. z = x | y
  87. self.assertIsInstance(y, chain)
  88. self.assertIsInstance(z, chain)
  89. self.assertEqual(len(z.tasks), 4)
  90. with self.assertRaises(TypeError):
  91. x | 10
  92. ax = self.add.s(2, 2) | (self.add.s(4) | self.add.s(8))
  93. self.assertIsInstance(ax, chain)
  94. self.assertEqual(len(ax.tasks), 3, 'consolidates chain to chain')
  95. def test_INVERT(self):
  96. x = self.add.s(2, 2)
  97. x.apply_async = Mock()
  98. x.apply_async.return_value = Mock()
  99. x.apply_async.return_value.get = Mock()
  100. x.apply_async.return_value.get.return_value = 4
  101. self.assertEqual(~x, 4)
  102. self.assertTrue(x.apply_async.called)
  103. def test_merge_immutable(self):
  104. x = self.add.si(2, 2, foo=1)
  105. args, kwargs, options = x._merge((4, ), {'bar': 2}, {'task_id': 3})
  106. self.assertTupleEqual(args, (2, 2))
  107. self.assertDictEqual(kwargs, {'foo': 1})
  108. self.assertDictEqual(options, {'task_id': 3})
  109. def test_set_immutable(self):
  110. x = self.add.s(2, 2)
  111. self.assertFalse(x.immutable)
  112. x.set(immutable=True)
  113. self.assertTrue(x.immutable)
  114. x.set(immutable=False)
  115. self.assertFalse(x.immutable)
  116. def test_election(self):
  117. x = self.add.s(2, 2)
  118. x.freeze('foo')
  119. x.type.app.control = Mock()
  120. r = x.election()
  121. self.assertTrue(x.type.app.control.election.called)
  122. self.assertEqual(r.id, 'foo')
  123. def test_AsyncResult_when_not_registered(self):
  124. s = signature('xxx.not.registered', app=self.app)
  125. self.assertTrue(s.AsyncResult)
  126. def test_apply_async_when_not_registered(self):
  127. s = signature('xxx.not.registered', app=self.app)
  128. self.assertTrue(s._apply_async)
  129. class test_xmap_xstarmap(CanvasCase):
  130. def test_apply(self):
  131. for type, attr in [(xmap, 'map'), (xstarmap, 'starmap')]:
  132. args = [(i, i) for i in range(10)]
  133. s = getattr(self.add, attr)(args)
  134. s.type = Mock()
  135. s.apply_async(foo=1)
  136. s.type.apply_async.assert_called_with(
  137. (), {'task': self.add.s(), 'it': args}, foo=1,
  138. )
  139. self.assertEqual(type.from_dict(dict(s)), s)
  140. self.assertTrue(repr(s))
  141. class test_chunks(CanvasCase):
  142. def test_chunks(self):
  143. x = self.add.chunks(range(100), 10)
  144. self.assertEqual(
  145. dict(chunks.from_dict(dict(x), app=self.app)), dict(x),
  146. )
  147. self.assertTrue(x.group())
  148. self.assertEqual(len(x.group().tasks), 10)
  149. x.group = Mock()
  150. gr = x.group.return_value = Mock()
  151. x.apply_async()
  152. gr.apply_async.assert_called_with((), {})
  153. x()
  154. gr.assert_called_with()
  155. self.app.conf.CELERY_ALWAYS_EAGER = True
  156. chunks.apply_chunks(app=self.app, **x['kwargs'])
  157. class test_chain(CanvasCase):
  158. def test_repr(self):
  159. x = self.add.s(2, 2) | self.add.s(2)
  160. self.assertEqual(
  161. repr(x), '%s(2, 2) | %s(2)' % (self.add.name, self.add.name),
  162. )
  163. def test_reverse(self):
  164. x = self.add.s(2, 2) | self.add.s(2)
  165. self.assertIsInstance(signature(x), chain)
  166. self.assertIsInstance(signature(dict(x)), chain)
  167. def test_always_eager(self):
  168. self.app.conf.CELERY_ALWAYS_EAGER = True
  169. self.assertEqual(~(self.add.s(4, 4) | self.add.s(8)), 16)
  170. def test_apply(self):
  171. x = chain(self.add.s(4, 4), self.add.s(8), self.add.s(10))
  172. res = x.apply()
  173. self.assertIsInstance(res, EagerResult)
  174. self.assertEqual(res.get(), 26)
  175. self.assertEqual(res.parent.get(), 16)
  176. self.assertEqual(res.parent.parent.get(), 8)
  177. self.assertIsNone(res.parent.parent.parent)
  178. def test_call_no_tasks(self):
  179. x = chain()
  180. self.assertFalse(x())
  181. def test_call_with_tasks(self):
  182. x = self.add.s(2, 2) | self.add.s(4)
  183. x.apply_async = Mock()
  184. x(2, 2, foo=1)
  185. x.apply_async.assert_called_with((2, 2), {'foo': 1})
  186. def test_from_dict_no_args__with_args(self):
  187. x = dict(self.add.s(2, 2) | self.add.s(4))
  188. x['args'] = None
  189. self.assertIsInstance(chain.from_dict(x), chain)
  190. x['args'] = (2, )
  191. self.assertIsInstance(chain.from_dict(x), chain)
  192. def test_accepts_generator_argument(self):
  193. x = chain(self.add.s(i) for i in range(10))
  194. self.assertTrue(x.tasks[0].type, self.add)
  195. self.assertTrue(x.type)
  196. class test_group(CanvasCase):
  197. def test_repr(self):
  198. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  199. self.assertEqual(repr(x), repr(x.tasks))
  200. def test_reverse(self):
  201. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  202. self.assertIsInstance(signature(x), group)
  203. self.assertIsInstance(signature(dict(x)), group)
  204. def test_maybe_group_sig(self):
  205. self.assertListEqual(
  206. _maybe_group(self.add.s(2, 2)), [self.add.s(2, 2)],
  207. )
  208. def test_from_dict(self):
  209. x = group([self.add.s(2, 2), self.add.s(4, 4)])
  210. x['args'] = (2, 2)
  211. self.assertTrue(group.from_dict(dict(x)))
  212. x['args'] = None
  213. self.assertTrue(group.from_dict(dict(x)))
  214. def test_call_empty_group(self):
  215. x = group(app=self.app)
  216. self.assertFalse(len(x()))
  217. def test_skew(self):
  218. g = group([self.add.s(i, i) for i in range(10)])
  219. g.skew(start=1, stop=10, step=1)
  220. for i, task in enumerate(g.tasks):
  221. self.assertEqual(task.options['countdown'], i + 1)
  222. def test_iter(self):
  223. g = group([self.add.s(i, i) for i in range(10)])
  224. self.assertListEqual(list(iter(g)), g.tasks)
  225. class test_chord(CanvasCase):
  226. def test_reverse(self):
  227. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  228. self.assertIsInstance(signature(x), chord)
  229. self.assertIsInstance(signature(dict(x)), chord)
  230. def test_clone_clones_body(self):
  231. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  232. y = x.clone()
  233. self.assertIsNot(x.kwargs['body'], y.kwargs['body'])
  234. y.kwargs.pop('body')
  235. z = y.clone()
  236. self.assertIsNone(z.kwargs.get('body'))
  237. def test_links_to_body(self):
  238. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  239. x.link(self.div.s(2))
  240. self.assertFalse(x.options.get('link'))
  241. self.assertTrue(x.kwargs['body'].options['link'])
  242. x.link_error(self.div.s(2))
  243. self.assertFalse(x.options.get('link_error'))
  244. self.assertTrue(x.kwargs['body'].options['link_error'])
  245. self.assertTrue(x.tasks)
  246. self.assertTrue(x.body)
  247. def test_repr(self):
  248. x = chord([self.add.s(2, 2), self.add.s(4, 4)], body=self.mul.s(4))
  249. self.assertTrue(repr(x))
  250. x.kwargs['body'] = None
  251. self.assertIn('without body', repr(x))
  252. class test_maybe_signature(CanvasCase):
  253. def test_is_None(self):
  254. self.assertIsNone(maybe_signature(None, app=self.app))
  255. def test_is_dict(self):
  256. self.assertIsInstance(
  257. maybe_signature(dict(self.add.s()), app=self.app), Signature,
  258. )
  259. def test_when_sig(self):
  260. s = self.add.s()
  261. self.assertIs(maybe_signature(s, app=self.app), s)