test_canvas.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. from __future__ import absolute_import
  2. from mock import Mock
  3. from celery import shared_task
  4. from celery.canvas import (
  5. Signature,
  6. chain,
  7. group,
  8. chord,
  9. subtask,
  10. xmap,
  11. xstarmap,
  12. chunks,
  13. _maybe_group,
  14. maybe_subtask,
  15. )
  16. from celery.result import EagerResult
  17. from celery.tests.utils import AppCase
  18. SIG = Signature({'task': 'TASK',
  19. 'args': ('A1', ),
  20. 'kwargs': {'K1': 'V1'},
  21. 'options': {'task_id': 'TASK_ID'},
  22. 'subtask_type': ''})
  23. @shared_task()
  24. def add(x, y):
  25. return x + y
  26. @shared_task()
  27. def mul(x, y):
  28. return x * y
  29. @shared_task()
  30. def div(x, y):
  31. return x / y
  32. class test_Signature(AppCase):
  33. def test_getitem_property_class(self):
  34. self.assertTrue(Signature.task)
  35. self.assertTrue(Signature.args)
  36. self.assertTrue(Signature.kwargs)
  37. self.assertTrue(Signature.options)
  38. self.assertTrue(Signature.subtask_type)
  39. def test_getitem_property(self):
  40. self.assertEqual(SIG.task, 'TASK')
  41. self.assertEqual(SIG.args, ('A1', ))
  42. self.assertEqual(SIG.kwargs, {'K1': 'V1'})
  43. self.assertEqual(SIG.options, {'task_id': 'TASK_ID'})
  44. self.assertEqual(SIG.subtask_type, '')
  45. def test_replace(self):
  46. x = Signature('TASK', ('A'), {})
  47. self.assertTupleEqual(x.replace(args=('B', )).args, ('B', ))
  48. self.assertDictEqual(
  49. x.replace(kwargs={'FOO': 'BAR'}).kwargs,
  50. {'FOO': 'BAR'},
  51. )
  52. self.assertDictEqual(
  53. x.replace(options={'task_id': '123'}).options,
  54. {'task_id': '123'},
  55. )
  56. def test_set(self):
  57. self.assertDictEqual(
  58. Signature('TASK', x=1).set(task_id='2').options,
  59. {'x': 1, 'task_id': '2'},
  60. )
  61. def test_link(self):
  62. x = subtask(SIG)
  63. x.link(SIG)
  64. x.link(SIG)
  65. self.assertIn(SIG, x.options['link'])
  66. self.assertEqual(len(x.options['link']), 1)
  67. def test_link_error(self):
  68. x = subtask(SIG)
  69. x.link_error(SIG)
  70. x.link_error(SIG)
  71. self.assertIn(SIG, x.options['link_error'])
  72. self.assertEqual(len(x.options['link_error']), 1)
  73. def test_flatten_links(self):
  74. tasks = [add.s(2, 2), mul.s(4), div.s(2)]
  75. tasks[0].link(tasks[1])
  76. tasks[1].link(tasks[2])
  77. self.assertEqual(tasks[0].flatten_links(), tasks)
  78. def test_OR(self):
  79. x = add.s(2, 2) | mul.s(4)
  80. self.assertIsInstance(x, chain)
  81. y = add.s(4, 4) | div.s(2)
  82. z = x | y
  83. self.assertIsInstance(y, chain)
  84. self.assertIsInstance(z, chain)
  85. self.assertEqual(len(z.tasks), 4)
  86. with self.assertRaises(TypeError):
  87. x | 10
  88. ax = add.s(2, 2) | (add.s(4) | add.s(8))
  89. self.assertIsInstance(ax, chain)
  90. self.assertEqual(len(ax.tasks), 3, 'consolidates chain to chain')
  91. def test_INVERT(self):
  92. x = add.s(2, 2)
  93. x.apply_async = Mock()
  94. x.apply_async.return_value = Mock()
  95. x.apply_async.return_value.get = Mock()
  96. x.apply_async.return_value.get.return_value = 4
  97. self.assertEqual(~x, 4)
  98. self.assertTrue(x.apply_async.called)
  99. def test_merge_immutable(self):
  100. x = add.si(2, 2, foo=1)
  101. args, kwargs, options = x._merge((4, ), {'bar': 2}, {'task_id': 3})
  102. self.assertTupleEqual(args, (2, 2))
  103. self.assertDictEqual(kwargs, {'foo': 1})
  104. self.assertDictEqual(options, {'task_id': 3})
  105. def test_set_immutable(self):
  106. x = add.s(2, 2)
  107. self.assertFalse(x.immutable)
  108. x.set(immutable=True)
  109. self.assertTrue(x.immutable)
  110. x.set(immutable=False)
  111. self.assertFalse(x.immutable)
  112. def test_election(self):
  113. x = add.s(2, 2)
  114. x._freeze('foo')
  115. prev, x.type.app.control = x.type.app.control, Mock()
  116. try:
  117. r = x.election()
  118. self.assertTrue(x.type.app.control.election.called)
  119. self.assertEqual(r.id, 'foo')
  120. finally:
  121. x.type.app.control = prev
  122. def test_AsyncResult_when_not_registerd(self):
  123. s = subtask('xxx.not.registered')
  124. self.assertTrue(s.AsyncResult)
  125. def test_apply_async_when_not_registered(self):
  126. s = subtask('xxx.not.registered')
  127. self.assertTrue(s._apply_async)
  128. class test_xmap_xstarmap(AppCase):
  129. def test_apply(self):
  130. for type, attr in [(xmap, 'map'), (xstarmap, 'starmap')]:
  131. args = [(i, i) for i in range(10)]
  132. s = getattr(add, attr)(args)
  133. s.type = Mock()
  134. s.apply_async(foo=1)
  135. s.type.apply_async.assert_called_with(
  136. (), {'task': add.s(), 'it': args}, foo=1,
  137. )
  138. self.assertEqual(type.from_dict(dict(s)), s)
  139. self.assertTrue(repr(s))
  140. class test_chunks(AppCase):
  141. def test_chunks(self):
  142. x = add.chunks(range(100), 10)
  143. self.assertEqual(chunks.from_dict(dict(x)), x)
  144. self.assertTrue(x.group())
  145. self.assertEqual(len(x.group().tasks), 10)
  146. x.group = Mock()
  147. gr = x.group.return_value = Mock()
  148. x.apply_async()
  149. gr.apply_async.assert_called_with((), {})
  150. x()
  151. gr.assert_called_with()
  152. self.app.conf.CELERY_ALWAYS_EAGER = True
  153. try:
  154. chunks.apply_chunks(**x['kwargs'])
  155. finally:
  156. self.app.conf.CELERY_ALWAYS_EAGER = False
  157. class test_chain(AppCase):
  158. def test_repr(self):
  159. x = add.s(2, 2) | add.s(2)
  160. self.assertEqual(repr(x), '%s(2, 2) | %s(2)' % (add.name, add.name))
  161. def test_reverse(self):
  162. x = add.s(2, 2) | add.s(2)
  163. self.assertIsInstance(subtask(x), chain)
  164. self.assertIsInstance(subtask(dict(x)), chain)
  165. def test_always_eager(self):
  166. self.app.conf.CELERY_ALWAYS_EAGER = True
  167. try:
  168. self.assertEqual(~(add.s(4, 4) | add.s(8)), 16)
  169. finally:
  170. self.app.conf.CELERY_ALWAYS_EAGER = False
  171. def test_apply(self):
  172. x = chain(add.s(4, 4), add.s(8), add.s(10))
  173. res = x.apply()
  174. self.assertIsInstance(res, EagerResult)
  175. self.assertEqual(res.get(), 26)
  176. self.assertEqual(res.parent.get(), 16)
  177. self.assertEqual(res.parent.parent.get(), 8)
  178. self.assertIsNone(res.parent.parent.parent)
  179. def test_call_no_tasks(self):
  180. x = chain()
  181. self.assertFalse(x())
  182. def test_call_with_tasks(self):
  183. x = add.s(2, 2) | add.s(4)
  184. x.apply_async = Mock()
  185. x(2, 2, foo=1)
  186. x.apply_async.assert_called_with((2, 2), {'foo': 1})
  187. def test_from_dict_no_args__with_args(self):
  188. x = dict(add.s(2, 2) | add.s(4))
  189. x['args'] = None
  190. self.assertIsInstance(chain.from_dict(x), chain)
  191. x['args'] = (2, )
  192. self.assertIsInstance(chain.from_dict(x), chain)
  193. def test_accepts_generator_argument(self):
  194. x = chain(add.s(i) for i in range(10))
  195. self.assertTrue(x.tasks[0].type, add)
  196. self.assertTrue(x.type)
  197. class test_group(AppCase):
  198. def test_repr(self):
  199. x = group([add.s(2, 2), add.s(4, 4)])
  200. self.assertEqual(repr(x), repr(x.tasks))
  201. def test_reverse(self):
  202. x = group([add.s(2, 2), add.s(4, 4)])
  203. self.assertIsInstance(subtask(x), group)
  204. self.assertIsInstance(subtask(dict(x)), group)
  205. def test_maybe_group_sig(self):
  206. self.assertListEqual(_maybe_group(add.s(2, 2)), [add.s(2, 2)])
  207. def test_from_dict(self):
  208. x = group([add.s(2, 2), add.s(4, 4)])
  209. x['args'] = (2, 2)
  210. self.assertTrue(group.from_dict(dict(x)))
  211. x['args'] = None
  212. self.assertTrue(group.from_dict(dict(x)))
  213. def test_call_empty_group(self):
  214. x = group()
  215. self.assertIsNone(x())
  216. def test_skew(self):
  217. g = group([add.s(i, i) for i in range(10)])
  218. g.skew(start=1, stop=10, step=1)
  219. for i, task in enumerate(g.tasks):
  220. self.assertEqual(task.options['countdown'], i + 1)
  221. def test_iter(self):
  222. g = group([add.s(i, i) for i in range(10)])
  223. self.assertListEqual(list(iter(g)), g.tasks)
  224. class test_chord(AppCase):
  225. def test_reverse(self):
  226. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  227. self.assertIsInstance(subtask(x), chord)
  228. self.assertIsInstance(subtask(dict(x)), chord)
  229. def test_clone_clones_body(self):
  230. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  231. y = x.clone()
  232. self.assertIsNot(x.kwargs['body'], y.kwargs['body'])
  233. y.kwargs.pop('body')
  234. z = y.clone()
  235. self.assertIsNone(z.kwargs.get('body'))
  236. def test_links_to_body(self):
  237. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  238. x.link(div.s(2))
  239. self.assertFalse(x.options.get('link'))
  240. self.assertTrue(x.kwargs['body'].options['link'])
  241. x.link_error(div.s(2))
  242. self.assertFalse(x.options.get('link_error'))
  243. self.assertTrue(x.kwargs['body'].options['link_error'])
  244. self.assertTrue(x.tasks)
  245. self.assertTrue(x.body)
  246. def test_repr(self):
  247. x = chord([add.s(2, 2), add.s(4, 4)], body=mul.s(4))
  248. self.assertTrue(repr(x))
  249. x.kwargs['body'] = None
  250. self.assertIn('without body', repr(x))
  251. class test_maybe_subtask(AppCase):
  252. def test_is_None(self):
  253. self.assertIsNone(maybe_subtask(None))
  254. def test_is_dict(self):
  255. self.assertIsInstance(maybe_subtask(dict(add.s())), Signature)
  256. def test_when_sig(self):
  257. s = add.s()
  258. self.assertIs(maybe_subtask(s), s)