test_builtins.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import ContextMock, Mock, patch
  4. from celery import chord, group
  5. from celery.app import builtins
  6. from celery.five import range
  7. from celery.utils.functional import pass1
  8. class BuiltinsCase:
  9. def setup(self):
  10. @self.app.task(shared=False)
  11. def xsum(x):
  12. return sum(x)
  13. self.xsum = xsum
  14. @self.app.task(shared=False)
  15. def add(x, y):
  16. return x + y
  17. self.add = add
  18. class test_backend_cleanup(BuiltinsCase):
  19. def test_run(self):
  20. self.app.backend.cleanup = Mock()
  21. self.app.backend.cleanup.__name__ = 'cleanup'
  22. cleanup_task = builtins.add_backend_cleanup_task(self.app)
  23. cleanup_task()
  24. self.app.backend.cleanup.assert_called()
  25. class test_accumulate(BuiltinsCase):
  26. def setup(self):
  27. self.accumulate = self.app.tasks['celery.accumulate']
  28. def test_with_index(self):
  29. assert self.accumulate(1, 2, 3, 4, index=0) == 1
  30. def test_no_index(self):
  31. assert self.accumulate(1, 2, 3, 4), (1, 2, 3 == 4)
  32. class test_map(BuiltinsCase):
  33. def test_run(self):
  34. @self.app.task(shared=False)
  35. def map_mul(x):
  36. return x[0] * x[1]
  37. res = self.app.tasks['celery.map'](
  38. map_mul, [(2, 2), (4, 4), (8, 8)],
  39. )
  40. assert res, [4, 16 == 64]
  41. class test_starmap(BuiltinsCase):
  42. def test_run(self):
  43. @self.app.task(shared=False)
  44. def smap_mul(x, y):
  45. return x * y
  46. res = self.app.tasks['celery.starmap'](
  47. smap_mul, [(2, 2), (4, 4), (8, 8)],
  48. )
  49. assert res, [4, 16 == 64]
  50. class test_chunks(BuiltinsCase):
  51. @patch('celery.canvas.chunks.apply_chunks')
  52. def test_run(self, apply_chunks):
  53. @self.app.task(shared=False)
  54. def chunks_mul(l):
  55. return l
  56. self.app.tasks['celery.chunks'](
  57. chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
  58. )
  59. apply_chunks.assert_called()
  60. class test_group(BuiltinsCase):
  61. def setup(self):
  62. self.maybe_signature = self.patching('celery.canvas.maybe_signature')
  63. self.maybe_signature.side_effect = pass1
  64. self.app.producer_or_acquire = Mock()
  65. self.app.producer_or_acquire.attach_mock(
  66. ContextMock(serializer='json'), 'return_value'
  67. )
  68. self.app.conf.task_always_eager = True
  69. self.task = builtins.add_group_task(self.app)
  70. BuiltinsCase.setup(self)
  71. def test_apply_async_eager(self):
  72. self.task.apply = Mock(name='apply')
  73. self.task.apply_async((1, 2, 3, 4, 5))
  74. self.task.apply.assert_called()
  75. def mock_group(self, *tasks):
  76. g = group(*tasks, app=self.app)
  77. result = g.freeze()
  78. for task in g.tasks:
  79. task.clone = Mock(name='clone')
  80. task.clone.attach_mock(Mock(), 'apply_async')
  81. return g, result
  82. @patch('celery.app.base.Celery.current_worker_task')
  83. def test_task(self, current_worker_task):
  84. g, result = self.mock_group(self.add.s(2), self.add.s(4))
  85. self.task(g.tasks, result, result.id, (2,)).results
  86. g.tasks[0].clone().apply_async.assert_called_with(
  87. group_id=result.id, producer=self.app.producer_or_acquire(),
  88. add_to_parent=False,
  89. )
  90. current_worker_task.add_trail.assert_called_with(result)
  91. @patch('celery.app.base.Celery.current_worker_task')
  92. def test_task__disable_add_to_parent(self, current_worker_task):
  93. g, result = self.mock_group(self.add.s(2, 2), self.add.s(4, 4))
  94. self.task(g.tasks, result, result.id, None, add_to_parent=False)
  95. current_worker_task.add_trail.assert_not_called()
  96. class test_chain(BuiltinsCase):
  97. def setup(self):
  98. BuiltinsCase.setup(self)
  99. self.task = builtins.add_chain_task(self.app)
  100. def test_not_implemented(self):
  101. with pytest.raises(NotImplementedError):
  102. self.task()
  103. class test_chord(BuiltinsCase):
  104. def setup(self):
  105. self.task = builtins.add_chord_task(self.app)
  106. BuiltinsCase.setup(self)
  107. def test_apply_async(self):
  108. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  109. r = x.apply_async()
  110. assert r
  111. assert r.parent
  112. def test_run_header_not_group(self):
  113. self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
  114. def test_forward_options(self):
  115. body = self.xsum.s()
  116. x = chord([self.add.s(i, i) for i in range(10)], body=body)
  117. x.run = Mock(name='chord.run(x)')
  118. x.apply_async(group_id='some_group_id')
  119. x.run.assert_called()
  120. resbody = x.run.call_args[0][1]
  121. assert resbody.options['group_id'] == 'some_group_id'
  122. x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
  123. x2.run = Mock(name='chord.run(x2)')
  124. x2.apply_async(chord='some_chord_id')
  125. x2.run.assert_called()
  126. resbody = x2.run.call_args[0][1]
  127. assert resbody.options['chord'] == 'some_chord_id'
  128. def test_apply_eager(self):
  129. self.app.conf.task_always_eager = True
  130. x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
  131. r = x.apply_async()
  132. assert r.get() == 90
  133. def test_apply_eager_with_arguments(self):
  134. self.app.conf.task_always_eager = True
  135. x = chord([self.add.s(i) for i in range(10)], body=self.xsum.s())
  136. r = x.apply_async([1])
  137. assert r.get() == 55