| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 | 
							- from __future__ import absolute_import, unicode_literals
 
- import pytest
 
- from case import ContextMock, Mock, patch
 
- from celery import chord, group
 
- from celery.app import builtins
 
- from celery.five import range
 
- from celery.utils.functional import pass1
 
- class BuiltinsCase:
 
-     def setup(self):
 
-         @self.app.task(shared=False)
 
-         def xsum(x):
 
-             return sum(x)
 
-         self.xsum = xsum
 
-         @self.app.task(shared=False)
 
-         def add(x, y):
 
-             return x + y
 
-         self.add = add
 
- class test_backend_cleanup(BuiltinsCase):
 
-     def test_run(self):
 
-         self.app.backend.cleanup = Mock()
 
-         self.app.backend.cleanup.__name__ = 'cleanup'
 
-         cleanup_task = builtins.add_backend_cleanup_task(self.app)
 
-         cleanup_task()
 
-         self.app.backend.cleanup.assert_called()
 
- class test_accumulate(BuiltinsCase):
 
-     def setup(self):
 
-         self.accumulate = self.app.tasks['celery.accumulate']
 
-     def test_with_index(self):
 
-         assert self.accumulate(1, 2, 3, 4, index=0) == 1
 
-     def test_no_index(self):
 
-         assert self.accumulate(1, 2, 3, 4), (1, 2, 3 == 4)
 
- class test_map(BuiltinsCase):
 
-     def test_run(self):
 
-         @self.app.task(shared=False)
 
-         def map_mul(x):
 
-             return x[0] * x[1]
 
-         res = self.app.tasks['celery.map'](
 
-             map_mul, [(2, 2), (4, 4), (8, 8)],
 
-         )
 
-         assert res, [4, 16 == 64]
 
- class test_starmap(BuiltinsCase):
 
-     def test_run(self):
 
-         @self.app.task(shared=False)
 
-         def smap_mul(x, y):
 
-             return x * y
 
-         res = self.app.tasks['celery.starmap'](
 
-             smap_mul, [(2, 2), (4, 4), (8, 8)],
 
-         )
 
-         assert res, [4, 16 == 64]
 
- class test_chunks(BuiltinsCase):
 
-     @patch('celery.canvas.chunks.apply_chunks')
 
-     def test_run(self, apply_chunks):
 
-         @self.app.task(shared=False)
 
-         def chunks_mul(l):
 
-             return l
 
-         self.app.tasks['celery.chunks'](
 
-             chunks_mul, [(2, 2), (4, 4), (8, 8)], 1,
 
-         )
 
-         apply_chunks.assert_called()
 
- class test_group(BuiltinsCase):
 
-     def setup(self):
 
-         self.maybe_signature = self.patching('celery.canvas.maybe_signature')
 
-         self.maybe_signature.side_effect = pass1
 
-         self.app.producer_or_acquire = Mock()
 
-         self.app.producer_or_acquire.attach_mock(
 
-             ContextMock(serializer='json'), 'return_value'
 
-         )
 
-         self.app.conf.task_always_eager = True
 
-         self.task = builtins.add_group_task(self.app)
 
-         BuiltinsCase.setup(self)
 
-     def test_apply_async_eager(self):
 
-         self.task.apply = Mock(name='apply')
 
-         self.task.apply_async((1, 2, 3, 4, 5))
 
-         self.task.apply.assert_called()
 
-     def mock_group(self, *tasks):
 
-         g = group(*tasks, app=self.app)
 
-         result = g.freeze()
 
-         for task in g.tasks:
 
-             task.clone = Mock(name='clone')
 
-             task.clone.attach_mock(Mock(), 'apply_async')
 
-         return g, result
 
-     @patch('celery.app.base.Celery.current_worker_task')
 
-     def test_task(self, current_worker_task):
 
-         g, result = self.mock_group(self.add.s(2), self.add.s(4))
 
-         self.task(g.tasks, result, result.id, (2,)).results
 
-         g.tasks[0].clone().apply_async.assert_called_with(
 
-             group_id=result.id, producer=self.app.producer_or_acquire(),
 
-             add_to_parent=False,
 
-         )
 
-         current_worker_task.add_trail.assert_called_with(result)
 
-     @patch('celery.app.base.Celery.current_worker_task')
 
-     def test_task__disable_add_to_parent(self, current_worker_task):
 
-         g, result = self.mock_group(self.add.s(2, 2), self.add.s(4, 4))
 
-         self.task(g.tasks, result, result.id, None, add_to_parent=False)
 
-         current_worker_task.add_trail.assert_not_called()
 
- class test_chain(BuiltinsCase):
 
-     def setup(self):
 
-         BuiltinsCase.setup(self)
 
-         self.task = builtins.add_chain_task(self.app)
 
-     def test_not_implemented(self):
 
-         with pytest.raises(NotImplementedError):
 
-             self.task()
 
- class test_chord(BuiltinsCase):
 
-     def setup(self):
 
-         self.task = builtins.add_chord_task(self.app)
 
-         BuiltinsCase.setup(self)
 
-     def test_apply_async(self):
 
-         x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
 
-         r = x.apply_async()
 
-         assert r
 
-         assert r.parent
 
-     def test_run_header_not_group(self):
 
-         self.task([self.add.s(i, i) for i in range(10)], self.xsum.s())
 
-     def test_forward_options(self):
 
-         body = self.xsum.s()
 
-         x = chord([self.add.s(i, i) for i in range(10)], body=body)
 
-         x.run = Mock(name='chord.run(x)')
 
-         x.apply_async(group_id='some_group_id')
 
-         x.run.assert_called()
 
-         resbody = x.run.call_args[0][1]
 
-         assert resbody.options['group_id'] == 'some_group_id'
 
-         x2 = chord([self.add.s(i, i) for i in range(10)], body=body)
 
-         x2.run = Mock(name='chord.run(x2)')
 
-         x2.apply_async(chord='some_chord_id')
 
-         x2.run.assert_called()
 
-         resbody = x2.run.call_args[0][1]
 
-         assert resbody.options['chord'] == 'some_chord_id'
 
-     def test_apply_eager(self):
 
-         self.app.conf.task_always_eager = True
 
-         x = chord([self.add.s(i, i) for i in range(10)], body=self.xsum.s())
 
-         r = x.apply_async()
 
-         assert r.get() == 90
 
-     def test_apply_eager_with_arguments(self):
 
-         self.app.conf.task_always_eager = True
 
-         x = chord([self.add.s(i) for i in range(10)], body=self.xsum.s())
 
-         r = x.apply_async([1])
 
-         assert r.get() == 55
 
 
  |