test_builtins.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. from __future__ import absolute_import, unicode_literals
  2. from celery import group, chord
  3. from celery.app import builtins
  4. from celery.five import range
  5. from celery.utils.functional import pass1
  6. from celery.tests.case import AppCase, ContextMock, Mock, patch
  7. class BuiltinsCase(AppCase):
  8. def setup(self):
  9. @self.app.task(shared=False)
  10. def xsum(x):
  11. return sum(x)
  12. self.xsum = xsum
  13. @self.app.task(shared=False)
  14. def add(x, y):
  15. return x + y
  16. self.add = add
  17. class test_backend_cleanup(BuiltinsCase):
  18. def test_run(self):
  19. self.app.backend.cleanup = Mock()
  20. self.app.backend.cleanup.__name__ = 'cleanup'
  21. cleanup_task = builtins.add_backend_cleanup_task(self.app)
  22. cleanup_task()
  23. self.app.backend.cleanup.assert_called()
  24. class test_accumulate(BuiltinsCase):
  25. def setup(self):
  26. self.accumulate = self.app.tasks['celery.accumulate']
  27. def test_with_index(self):
  28. self.assertEqual(self.accumulate(1, 2, 3, 4, index=0), 1)
  29. def test_no_index(self):
  30. self.assertEqual(self.accumulate(1, 2, 3, 4), (1, 2, 3, 4))
  31. class test_map(BuiltinsCase):
  32. def test_run(self):
  33. @self.app.task(shared=False)
  34. def map_mul(x):
  35. return x[0] * x[1]
  36. res = self.app.tasks['celery.map'](
  37. map_mul, [(2, 2), (4, 4), (8, 8)],
  38. )
  39. self.assertEqual(res, [4, 16, 64])
  40. class test_starmap(BuiltinsCase):
  41. def test_run(self):
  42. @self.app.task(shared=False)
  43. def smap_mul(x, y):
  44. return x * y
  45. res = self.app.tasks['celery.starmap'](
  46. smap_mul, [(2, 2), (4, 4), (8, 8)],
  47. )
  48. self.assertEqual(res, [4, 16, 64])
  49. class test_chunks(BuiltinsCase):
  50. @patch('celery.canvas.chunks.apply_chunks')
  51. def test_run(self, apply_chunks):
  52. @self.app.task(shared=False)
  53. def chunks_mul(l):
  54. return l
  55. self.app.tasks['celery.chunks'](
  56. chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
  57. )
  58. apply_chunks.assert_called()
  59. class test_group(BuiltinsCase):
  60. def setup(self):
  61. self.maybe_signature = self.patch('celery.canvas.maybe_signature')
  62. self.maybe_signature.side_effect = pass1
  63. self.app.producer_or_acquire = Mock()
  64. self.app.producer_or_acquire.attach_mock(ContextMock(), 'return_value')
  65. self.app.conf.task_always_eager = True
  66. self.task = builtins.add_group_task(self.app)
  67. super(test_group, self).setup()
  68. def test_apply_async_eager(self):
  69. self.task.apply = Mock(name='apply')
  70. self.task.apply_async((1, 2, 3, 4, 5))
  71. self.task.apply.assert_called()
  72. def mock_group(self, *tasks):
  73. g = group(*tasks, app=self.app)
  74. result = g.freeze()
  75. for task in g.tasks:
  76. task.clone = Mock(name='clone')
  77. task.clone.attach_mock(Mock(), 'apply_async')
  78. return g, result
  79. @patch('celery.app.base.Celery.current_worker_task')
  80. def test_task(self, current_worker_task):
  81. g, result = self.mock_group(self.add.s(2), self.add.s(4))
  82. self.task(g.tasks, result, result.id, (2,)).results
  83. g.tasks[0].clone().apply_async.assert_called_with(
  84. group_id=result.id, producer=self.app.producer_or_acquire(),
  85. add_to_parent=False,
  86. )
  87. current_worker_task.add_trail.assert_called_with(result)
  88. @patch('celery.app.base.Celery.current_worker_task')
  89. def test_task__disable_add_to_parent(self, current_worker_task):
  90. g, result = self.mock_group(self.add.s(2, 2), self.add.s(4, 4))
  91. self.task(g.tasks, result, result.id, None, add_to_parent=False)
  92. current_worker_task.add_trail.assert_not_called()
  93. class test_chain(BuiltinsCase):
  94. def setup(self):
  95. BuiltinsCase.setup(self)
  96. self.task = builtins.add_chain_task(self.app)
  97. def test_not_implemented(self):
  98. with self.assertRaises(NotImplementedError):
  99. self.task()
  100. class test_chord(BuiltinsCase):
  101. def setup(self):
  102. self.task = builtins.add_chord_task(self.app)
  103. super(test_chord, self).setup()
  104. def test_apply_async(self):
  105. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  106. r = x.apply_async()
  107. self.assertTrue(r)
  108. self.assertTrue(r.parent)
  109. def test_run_header_not_group(self):
  110. self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
  111. def test_forward_options(self):
  112. body = self.xsum.s()
  113. x = chord([self.add.s(i, i) for i in range(10)], body=body)
  114. x.run = Mock(name='chord.run(x)')
  115. x.apply_async(group_id='some_group_id')
  116. x.run.assert_called()
  117. resbody = x.run.call_args[0][1]
  118. self.assertEqual(resbody.options['group_id'], 'some_group_id')
  119. x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
  120. x2.run = Mock(name='chord.run(x2)')
  121. x2.apply_async(chord='some_chord_id')
  122. x2.run.assert_called()
  123. resbody = x2.run.call_args[0][1]
  124. self.assertEqual(resbody.options['chord'], 'some_chord_id')
  125. def test_apply_eager(self):
  126. self.app.conf.task_always_eager = True
  127. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  128. r = x.apply_async()
  129. self.assertEqual(r.get(), 90)
  130. def test_apply_eager_with_arguments(self):
  131. self.app.conf.task_always_eager = True
  132. x = chord([self.add.s(i) for i in range(10)], body=self.xsum.s())
  133. r = x.apply_async([1])
  134. self.assertEqual(r.get(), 55)