test_builtins.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import pytest
  2. from case import ContextMock, Mock, patch
  3. from celery import group, chord
  4. from celery.app import builtins
  5. from celery.utils.functional import pass1
  6. class BuiltinsCase:
  7. def setup(self):
  8. @self.app.task(shared=False)
  9. def xsum(x):
  10. return sum(x)
  11. self.xsum = xsum
  12. @self.app.task(shared=False)
  13. def add(x, y):
  14. return x + y
  15. self.add = add
  16. class test_backend_cleanup(BuiltinsCase):
  17. def test_run(self):
  18. self.app.backend.cleanup = Mock()
  19. self.app.backend.cleanup.__name__ = 'cleanup'
  20. cleanup_task = builtins.add_backend_cleanup_task(self.app)
  21. cleanup_task()
  22. self.app.backend.cleanup.assert_called()
  23. class test_accumulate(BuiltinsCase):
  24. def setup(self):
  25. self.accumulate = self.app.tasks['celery.accumulate']
  26. def test_with_index(self):
  27. assert self.accumulate(1, 2, 3, 4, index=0) == 1
  28. def test_no_index(self):
  29. assert self.accumulate(1, 2, 3, 4), (1, 2, 3 == 4)
  30. class test_map(BuiltinsCase):
  31. def test_run(self):
  32. @self.app.task(shared=False)
  33. def map_mul(x):
  34. return x[0] * x[1]
  35. res = self.app.tasks['celery.map'](
  36. map_mul, [(2, 2), (4, 4), (8, 8)],
  37. )
  38. assert res, [4, 16 == 64]
  39. class test_starmap(BuiltinsCase):
  40. def test_run(self):
  41. @self.app.task(shared=False)
  42. def smap_mul(x, y):
  43. return x * y
  44. res = self.app.tasks['celery.starmap'](
  45. smap_mul, [(2, 2), (4, 4), (8, 8)],
  46. )
  47. assert res, [4, 16 == 64]
  48. class test_chunks(BuiltinsCase):
  49. @patch('celery.canvas.chunks.apply_chunks')
  50. def test_run(self, apply_chunks):
  51. @self.app.task(shared=False)
  52. def chunks_mul(l):
  53. return l
  54. self.app.tasks['celery.chunks'](
  55. chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
  56. )
  57. apply_chunks.assert_called()
  58. class test_group(BuiltinsCase):
  59. def setup(self):
  60. self.maybe_signature = self.patching('celery.canvas.maybe_signature')
  61. self.maybe_signature.side_effect = pass1
  62. self.app.producer_or_acquire = Mock()
  63. self.app.producer_or_acquire.attach_mock(ContextMock(), 'return_value')
  64. self.app.conf.task_always_eager = True
  65. self.task = builtins.add_group_task(self.app)
  66. super().setup()
  67. def test_apply_async_eager(self):
  68. self.task.apply = Mock(name='apply')
  69. self.task.apply_async((1, 2, 3, 4, 5))
  70. self.task.apply.assert_called()
  71. def mock_group(self, *tasks):
  72. g = group(*tasks, app=self.app)
  73. result = g.freeze()
  74. for task in g.tasks:
  75. task.clone = Mock(name='clone')
  76. task.clone.attach_mock(Mock(), 'apply_async')
  77. return g, result
  78. @patch('celery.app.base.Celery.current_worker_task')
  79. def test_task(self, current_worker_task):
  80. g, result = self.mock_group(self.add.s(2), self.add.s(4))
  81. self.task(g.tasks, result, result.id, (2,)).results
  82. g.tasks[0].clone().apply_async.assert_called_with(
  83. group_id=result.id, producer=self.app.producer_or_acquire(),
  84. add_to_parent=False,
  85. )
  86. current_worker_task.add_trail.assert_called_with(result)
  87. @patch('celery.app.base.Celery.current_worker_task')
  88. def test_task__disable_add_to_parent(self, current_worker_task):
  89. g, result = self.mock_group(self.add.s(2, 2), self.add.s(4, 4))
  90. self.task(g.tasks, result, result.id, None, add_to_parent=False)
  91. current_worker_task.add_trail.assert_not_called()
  92. class test_chain(BuiltinsCase):
  93. def setup(self):
  94. BuiltinsCase.setup(self)
  95. self.task = builtins.add_chain_task(self.app)
  96. def test_not_implemented(self):
  97. with pytest.raises(NotImplementedError):
  98. self.task()
  99. class test_chord(BuiltinsCase):
  100. def setup(self):
  101. self.task = builtins.add_chord_task(self.app)
  102. super().setup()
  103. def test_apply_async(self):
  104. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  105. r = x.apply_async()
  106. assert r
  107. assert r.parent
  108. def test_run_header_not_group(self):
  109. self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
  110. def test_forward_options(self):
  111. body = self.xsum.s()
  112. x = chord([self.add.s(i, i) for i in range(10)], body=body)
  113. x.run = Mock(name='chord.run(x)')
  114. x.apply_async(group_id='some_group_id')
  115. x.run.assert_called()
  116. resbody = x.run.call_args[0][1]
  117. assert resbody.options['group_id'] == 'some_group_id'
  118. x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
  119. x2.run = Mock(name='chord.run(x2)')
  120. x2.apply_async(chord='some_chord_id')
  121. x2.run.assert_called()
  122. resbody = x2.run.call_args[0][1]
  123. assert resbody.options['chord'] == 'some_chord_id'
  124. def test_apply_eager(self):
  125. self.app.conf.task_always_eager = True
  126. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  127. r = x.apply_async()
  128. assert r.get() == 90
  129. def test_apply_eager_with_arguments(self):
  130. self.app.conf.task_always_eager = True
  131. x = chord([self.add.s(i) for i in range(10)], body=self.xsum.s())
  132. r = x.apply_async([1])
  133. assert r.get() == 55