test_builtins.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import ContextMock, Mock, patch
  4. from celery import group, chord
  5. from celery.app import builtins
  6. from celery.five import range
  7. from celery.utils.functional import pass1
  8. class BuiltinsCase:
  9. def setup(self):
  10. @self.app.task(shared=False)
  11. def xsum(x):
  12. return sum(x)
  13. self.xsum = xsum
  14. @self.app.task(shared=False)
  15. def add(x, y):
  16. return x + y
  17. self.add = add
  18. class test_backend_cleanup(BuiltinsCase):
  19. def test_run(self):
  20. self.app.backend.cleanup = Mock()
  21. self.app.backend.cleanup.__name__ = 'cleanup'
  22. cleanup_task = builtins.add_backend_cleanup_task(self.app)
  23. cleanup_task()
  24. self.app.backend.cleanup.assert_called()
  25. class test_accumulate(BuiltinsCase):
  26. def setup(self):
  27. self.accumulate = self.app.tasks['celery.accumulate']
  28. def test_with_index(self):
  29. assert self.accumulate(1, 2, 3, 4, index=0) == 1
  30. def test_no_index(self):
  31. assert self.accumulate(1, 2, 3, 4), (1, 2, 3 == 4)
  32. class test_map(BuiltinsCase):
  33. def test_run(self):
  34. @self.app.task(shared=False)
  35. def map_mul(x):
  36. return x[0] * x[1]
  37. res = self.app.tasks['celery.map'](
  38. map_mul, [(2, 2), (4, 4), (8, 8)],
  39. )
  40. assert res, [4, 16 == 64]
  41. class test_starmap(BuiltinsCase):
  42. def test_run(self):
  43. @self.app.task(shared=False)
  44. def smap_mul(x, y):
  45. return x * y
  46. res = self.app.tasks['celery.starmap'](
  47. smap_mul, [(2, 2), (4, 4), (8, 8)],
  48. )
  49. assert res, [4, 16 == 64]
  50. class test_chunks(BuiltinsCase):
  51. @patch('celery.canvas.chunks.apply_chunks')
  52. def test_run(self, apply_chunks):
  53. @self.app.task(shared=False)
  54. def chunks_mul(l):
  55. return l
  56. self.app.tasks['celery.chunks'](
  57. chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
  58. )
  59. apply_chunks.assert_called()
  60. class test_group(BuiltinsCase):
  61. def setup(self):
  62. self.maybe_signature = self.patching('celery.canvas.maybe_signature')
  63. self.maybe_signature.side_effect = pass1
  64. self.app.producer_or_acquire = Mock()
  65. self.app.producer_or_acquire.attach_mock(ContextMock(), 'return_value')
  66. self.app.conf.task_always_eager = True
  67. self.task = builtins.add_group_task(self.app)
  68. BuiltinsCase.setup(self)
  69. def test_apply_async_eager(self):
  70. self.task.apply = Mock(name='apply')
  71. self.task.apply_async((1, 2, 3, 4, 5))
  72. self.task.apply.assert_called()
  73. def mock_group(self, *tasks):
  74. g = group(*tasks, app=self.app)
  75. result = g.freeze()
  76. for task in g.tasks:
  77. task.clone = Mock(name='clone')
  78. task.clone.attach_mock(Mock(), 'apply_async')
  79. return g, result
  80. @patch('celery.app.base.Celery.current_worker_task')
  81. def test_task(self, current_worker_task):
  82. g, result = self.mock_group(self.add.s(2), self.add.s(4))
  83. self.task(g.tasks, result, result.id, (2,)).results
  84. g.tasks[0].clone().apply_async.assert_called_with(
  85. group_id=result.id, producer=self.app.producer_or_acquire(),
  86. add_to_parent=False,
  87. )
  88. current_worker_task.add_trail.assert_called_with(result)
  89. @patch('celery.app.base.Celery.current_worker_task')
  90. def test_task__disable_add_to_parent(self, current_worker_task):
  91. g, result = self.mock_group(self.add.s(2, 2), self.add.s(4, 4))
  92. self.task(g.tasks, result, result.id, None, add_to_parent=False)
  93. current_worker_task.add_trail.assert_not_called()
  94. class test_chain(BuiltinsCase):
  95. def setup(self):
  96. BuiltinsCase.setup(self)
  97. self.task = builtins.add_chain_task(self.app)
  98. def test_not_implemented(self):
  99. with pytest.raises(NotImplementedError):
  100. self.task()
  101. class test_chord(BuiltinsCase):
  102. def setup(self):
  103. self.task = builtins.add_chord_task(self.app)
  104. BuiltinsCase.setup(self)
  105. def test_apply_async(self):
  106. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  107. r = x.apply_async()
  108. assert r
  109. assert r.parent
  110. def test_run_header_not_group(self):
  111. self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
  112. def test_forward_options(self):
  113. body = self.xsum.s()
  114. x = chord([self.add.s(i, i) for i in range(10)], body=body)
  115. x.run = Mock(name='chord.run(x)')
  116. x.apply_async(group_id='some_group_id')
  117. x.run.assert_called()
  118. resbody = x.run.call_args[0][1]
  119. assert resbody.options['group_id'] == 'some_group_id'
  120. x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
  121. x2.run = Mock(name='chord.run(x2)')
  122. x2.apply_async(chord='some_chord_id')
  123. x2.run.assert_called()
  124. resbody = x2.run.call_args[0][1]
  125. assert resbody.options['chord'] == 'some_chord_id'
  126. def test_apply_eager(self):
  127. self.app.conf.task_always_eager = True
  128. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  129. r = x.apply_async()
  130. assert r.get() == 90
  131. def test_apply_eager_with_arguments(self):
  132. self.app.conf.task_always_eager = True
  133. x = chord([self.add.s(i) for i in range(10)], body=self.xsum.s())
  134. r = x.apply_async([1])
  135. assert r.get() == 55