test_builtins.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. from __future__ import absolute_import
  2. from celery import group, chord
  3. from celery.app import builtins
  4. from celery.canvas import Signature
  5. from celery.five import range
  6. from celery._state import _task_stack
  7. from celery.tests.case import AppCase, Mock, patch
  8. class BuiltinsCase(AppCase):
  9. def setup(self):
  10. @self.app.task(shared=False)
  11. def xsum(x):
  12. return sum(x)
  13. self.xsum = xsum
  14. @self.app.task(shared=False)
  15. def add(x, y):
  16. return x + y
  17. self.add = add
  18. class test_backend_cleanup(BuiltinsCase):
  19. def test_run(self):
  20. self.app.backend.cleanup = Mock()
  21. self.app.backend.cleanup.__name__ = 'cleanup'
  22. cleanup_task = builtins.add_backend_cleanup_task(self.app)
  23. cleanup_task()
  24. self.assertTrue(self.app.backend.cleanup.called)
  25. class test_map(BuiltinsCase):
  26. def test_run(self):
  27. @self.app.task(shared=False)
  28. def map_mul(x):
  29. return x[0] * x[1]
  30. res = self.app.tasks['celery.map'](
  31. map_mul, [(2, 2), (4, 4), (8, 8)],
  32. )
  33. self.assertEqual(res, [4, 16, 64])
  34. class test_starmap(BuiltinsCase):
  35. def test_run(self):
  36. @self.app.task(shared=False)
  37. def smap_mul(x, y):
  38. return x * y
  39. res = self.app.tasks['celery.starmap'](
  40. smap_mul, [(2, 2), (4, 4), (8, 8)],
  41. )
  42. self.assertEqual(res, [4, 16, 64])
  43. class test_chunks(BuiltinsCase):
  44. @patch('celery.canvas.chunks.apply_chunks')
  45. def test_run(self, apply_chunks):
  46. @self.app.task(shared=False)
  47. def chunks_mul(l):
  48. return l
  49. self.app.tasks['celery.chunks'](
  50. chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
  51. )
  52. self.assertTrue(apply_chunks.called)
  53. class test_group(BuiltinsCase):
  54. def setup(self):
  55. self.task = builtins.add_group_task(self.app)()
  56. super(test_group, self).setup()
  57. def test_apply_async_eager(self):
  58. self.task.apply = Mock()
  59. self.app.conf.CELERY_ALWAYS_EAGER = True
  60. self.task.apply_async()
  61. self.assertTrue(self.task.apply.called)
  62. def test_apply(self):
  63. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  64. x.name = self.task.name
  65. res = x.apply()
  66. self.assertEqual(res.get(), [8, 16])
  67. def test_apply_async(self):
  68. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  69. x.apply_async()
  70. def test_apply_empty(self):
  71. x = group(app=self.app)
  72. x.apply()
  73. res = x.apply_async()
  74. self.assertFalse(res)
  75. self.assertFalse(res.results)
  76. def test_apply_async_with_parent(self):
  77. _task_stack.push(self.add)
  78. try:
  79. self.add.push_request(called_directly=False)
  80. try:
  81. assert not self.add.request.children
  82. x = group([self.add.s(4, 4), self.add.s(8, 8)])
  83. res = x()
  84. self.assertTrue(self.add.request.children)
  85. self.assertIn(res, self.add.request.children)
  86. self.assertEqual(len(self.add.request.children), 1)
  87. finally:
  88. self.add.pop_request()
  89. finally:
  90. _task_stack.pop()
  91. class test_chain(BuiltinsCase):
  92. def setup(self):
  93. BuiltinsCase.setup(self)
  94. self.task = builtins.add_chain_task(self.app)()
  95. def test_apply_async(self):
  96. c = self.add.s(2, 2) | self.add.s(4) | self.add.s(8)
  97. result = c.apply_async()
  98. self.assertTrue(result.parent)
  99. self.assertTrue(result.parent.parent)
  100. self.assertIsNone(result.parent.parent.parent)
  101. def test_group_to_chord(self):
  102. c = (
  103. group([self.add.s(i, i) for i in range(5)], app=self.app) |
  104. self.add.s(10) |
  105. self.add.s(20) |
  106. self.add.s(30)
  107. )
  108. tasks, _ = c.prepare_steps((), c.tasks)
  109. self.assertIsInstance(tasks[0], chord)
  110. self.assertTrue(tasks[0].body.options['link'])
  111. self.assertTrue(tasks[0].body.options['link'][0].options['link'])
  112. c2 = self.add.s(2, 2) | group(self.add.s(i, i) for i in range(10))
  113. tasks2, _ = c2.prepare_steps((), c2.tasks)
  114. self.assertIsInstance(tasks2[1], group)
  115. def test_apply_options(self):
  116. class static(Signature):
  117. def clone(self, *args, **kwargs):
  118. return self
  119. def s(*args, **kwargs):
  120. return static(self.add, args, kwargs, type=self.add, app=self.app)
  121. c = s(2, 2) | s(4, 4) | s(8, 8)
  122. r1 = c.apply_async(task_id='some_id')
  123. self.assertEqual(r1.id, 'some_id')
  124. c.apply_async(group_id='some_group_id')
  125. self.assertEqual(c.tasks[-1].options['group_id'], 'some_group_id')
  126. c.apply_async(chord='some_chord_id')
  127. self.assertEqual(c.tasks[-1].options['chord'], 'some_chord_id')
  128. c.apply_async(link=[s(32)])
  129. self.assertListEqual(c.tasks[-1].options['link'], [s(32)])
  130. c.apply_async(link_error=[s('error')])
  131. for task in c.tasks:
  132. self.assertListEqual(task.options['link_error'], [s('error')])
  133. class test_chord(BuiltinsCase):
  134. def setup(self):
  135. self.task = builtins.add_chord_task(self.app)()
  136. super(test_chord, self).setup()
  137. def test_apply_async(self):
  138. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  139. r = x.apply_async()
  140. self.assertTrue(r)
  141. self.assertTrue(r.parent)
  142. def test_run_header_not_group(self):
  143. self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
  144. def test_forward_options(self):
  145. body = self.xsum.s()
  146. x = chord([self.add.s(i, i) for i in range(10)], body=body)
  147. x.run = Mock(name='chord.run(x)')
  148. x.apply_async(group_id='some_group_id')
  149. self.assertTrue(x.run.called)
  150. resbody = x.run.call_args[0][1]
  151. self.assertEqual(resbody.options['group_id'], 'some_group_id')
  152. x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
  153. x2.run = Mock(name='chord.run(x2)')
  154. x2.apply_async(chord='some_chord_id')
  155. self.assertTrue(x2.run.called)
  156. resbody = x2.run.call_args[0][1]
  157. self.assertEqual(resbody.options['chord'], 'some_chord_id')
  158. def test_apply_eager(self):
  159. self.app.conf.CELERY_ALWAYS_EAGER = True
  160. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  161. r = x.apply_async()
  162. self.assertEqual(r.get(), 90)