test_builtins.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. from __future__ import absolute_import
  2. from celery import group, chord
  3. from celery.app import builtins
  4. from celery.canvas import Signature
  5. from celery.five import range
  6. from celery._state import _task_stack
  7. from celery.tests.case import AppCase, Mock, patch
  8. class BuiltinsCase(AppCase):
  9. def setup(self):
  10. @self.app.task(shared=False)
  11. def xsum(x):
  12. return sum(x)
  13. self.xsum = xsum
  14. @self.app.task(shared=False)
  15. def add(x, y):
  16. return x + y
  17. self.add = add
  18. class test_backend_cleanup(BuiltinsCase):
  19. def test_run(self):
  20. self.app.backend.cleanup = Mock()
  21. self.app.backend.cleanup.__name__ = 'cleanup'
  22. cleanup_task = builtins.add_backend_cleanup_task(self.app)
  23. cleanup_task()
  24. self.assertTrue(self.app.backend.cleanup.called)
  25. class test_map(BuiltinsCase):
  26. def test_run(self):
  27. @self.app.task(shared=False)
  28. def map_mul(x):
  29. return x[0] * x[1]
  30. res = self.app.tasks['celery.map'](
  31. map_mul, [(2, 2), (4, 4), (8, 8)],
  32. )
  33. self.assertEqual(res, [4, 16, 64])
  34. class test_starmap(BuiltinsCase):
  35. def test_run(self):
  36. @self.app.task(shared=False)
  37. def smap_mul(x, y):
  38. return x * y
  39. res = self.app.tasks['celery.starmap'](
  40. smap_mul, [(2, 2), (4, 4), (8, 8)],
  41. )
  42. self.assertEqual(res, [4, 16, 64])
  43. class test_chunks(BuiltinsCase):
  44. @patch('celery.canvas.chunks.apply_chunks')
  45. def test_run(self, apply_chunks):
  46. @self.app.task(shared=False)
  47. def chunks_mul(l):
  48. return l
  49. self.app.tasks['celery.chunks'](
  50. chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
  51. )
  52. self.assertTrue(apply_chunks.called)
  53. class test_group(BuiltinsCase):
  54. def setup(self):
  55. self.task = builtins.add_group_task(self.app)
  56. super(test_group, self).setup()
  57. def test_apply_async_eager(self):
  58. self.task.apply = Mock()
  59. self.app.conf.CELERY_ALWAYS_EAGER = True
  60. self.task.apply_async((1, 2, 3, 4, 5))
  61. self.assertTrue(self.task.apply.called)
  62. def test_apply(self):
  63. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  64. res = x.apply()
  65. self.assertEqual(res.get(), [8, 16])
  66. def test_apply_async(self):
  67. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  68. x.apply_async()
  69. def test_apply_empty(self):
  70. x = group(app=self.app)
  71. x.apply()
  72. res = x.apply_async()
  73. self.assertFalse(res)
  74. self.assertFalse(res.results)
  75. def test_apply_async_with_parent(self):
  76. _task_stack.push(self.add)
  77. try:
  78. self.add.push_request(called_directly=False)
  79. try:
  80. assert not self.add.request.children
  81. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  82. res = x()
  83. self.assertTrue(self.add.request.children)
  84. self.assertIn(res, self.add.request.children)
  85. self.assertEqual(len(self.add.request.children), 1)
  86. finally:
  87. self.add.pop_request()
  88. finally:
  89. _task_stack.pop()
  90. class test_chain(BuiltinsCase):
  91. def setup(self):
  92. BuiltinsCase.setup(self)
  93. self.task = builtins.add_chain_task(self.app)
  94. def test_apply_async(self):
  95. c = self.add.s(2, 2) | self.add.s(4) | self.add.s(8)
  96. result = c.apply_async()
  97. self.assertTrue(result.parent)
  98. self.assertTrue(result.parent.parent)
  99. self.assertIsNone(result.parent.parent.parent)
  100. def test_group_to_chord(self):
  101. c = (
  102. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  103. self.add.s(10) |
  104. self.add.s(20) |
  105. self.add.s(30)
  106. )
  107. tasks, _ = c.prepare_steps((), c.tasks)
  108. self.assertIsInstance(tasks[0], chord)
  109. self.assertTrue(tasks[0].body.options['link'])
  110. self.assertTrue(tasks[0].body.options['link'][0].options['link'])
  111. c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
  112. tasks2, _ = c2.prepare_steps((), c2.tasks)
  113. self.assertIsInstance(tasks2[1], group)
  114. def test_apply_options(self):
  115. class static(Signature):
  116. def clone(self, *args, **kwargs):
  117. return self
  118. def s(*args, **kwargs):
  119. return static(self.add, args, kwargs, type=self.add, app=self.app)
  120. c = s(2, 2) | s(4, 4) | s(8, 8)
  121. r1 = c.apply_async(task_id='some_id')
  122. self.assertEqual(r1.id, 'some_id')
  123. c.apply_async(group_id='some_group_id')
  124. self.assertEqual(c.tasks[-1].options['group_id'], 'some_group_id')
  125. c.apply_async(chord='some_chord_id')
  126. self.assertEqual(c.tasks[-1].options['chord'], 'some_chord_id')
  127. c.apply_async(link=[s(32)])
  128. self.assertListEqual(c.tasks[-1].options['link'], [s(32)])
  129. c.apply_async(link_error=[s('error')])
  130. for task in c.tasks:
  131. self.assertListEqual(task.options['link_error'], [s('error')])
  132. class test_chord(BuiltinsCase):
  133. def setup(self):
  134. self.task = builtins.add_chord_task(self.app)
  135. super(test_chord, self).setup()
  136. def test_apply_async(self):
  137. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  138. r = x.apply_async()
  139. self.assertTrue(r)
  140. self.assertTrue(r.parent)
  141. def test_run_header_not_group(self):
  142. self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
  143. def test_forward_options(self):
  144. body = self.xsum.s()
  145. x = chord([self.add.s(i, i) for i in range(10)], body=body)
  146. x.run = Mock(name='chord.run(x)')
  147. x.apply_async(group_id='some_group_id')
  148. self.assertTrue(x.run.called)
  149. resbody = x.run.call_args[0][1]
  150. self.assertEqual(resbody.options['group_id'], 'some_group_id')
  151. x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
  152. x2.run = Mock(name='chord.run(x2)')
  153. x2.apply_async(chord='some_chord_id')
  154. self.assertTrue(x2.run.called)
  155. resbody = x2.run.call_args[0][1]
  156. self.assertEqual(resbody.options['chord'], 'some_chord_id')
  157. def test_apply_eager(self):
  158. self.app.conf.CELERY_ALWAYS_EAGER = True
  159. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  160. r = x.apply_async()
  161. self.assertEqual(r.get(), 90)