12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- from __future__ import absolute_import, unicode_literals
- import pytest
- from case import Mock, patch
- from celery import chord, group
- from celery.backends.rpc import RPCBackend
- from celery._state import _task_stack
- class test_RPCBackend:
- def setup(self):
- self.b = RPCBackend(app=self.app)
- def test_oid(self):
- oid = self.b.oid
- oid2 = self.b.oid
- assert oid == oid2
- assert oid == self.app.oid
- def test_interface(self):
- self.b.on_reply_declare('task_id')
- def test_ensure_chords_allowed(self):
- with pytest.raises(NotImplementedError):
- self.b.ensure_chords_allowed()
- def test_apply_chord(self):
- with pytest.raises(NotImplementedError):
- self.b.apply_chord([], (), 'gid', Mock(name='body'))
- @pytest.mark.celery(result_backend='rpc')
- def test_chord_raises_error(self):
- with pytest.raises(NotImplementedError):
- chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))
- @pytest.mark.celery(result_backend='rpc')
- def test_chain_with_chord_raises_error(self):
- with pytest.raises(NotImplementedError):
- (self.add.s(2, 2) |
- group(self.add.s(2, 2),
- self.add.s(5, 6)) | self.add.s()).delay()
- def test_destination_for(self):
- req = Mock(name='request')
- req.reply_to = 'reply_to'
- req.correlation_id = 'corid'
- assert self.b.destination_for('task_id', req) == ('reply_to', 'corid')
- task = Mock()
- _task_stack.push(task)
- try:
- task.request.reply_to = 'reply_to'
- task.request.correlation_id = 'corid'
- assert self.b.destination_for('task_id', None) == (
- 'reply_to', 'corid',
- )
- finally:
- _task_stack.pop()
- with pytest.raises(RuntimeError):
- self.b.destination_for('task_id', None)
- def test_binding(self):
- queue = self.b.binding
- assert queue.name == self.b.oid
- assert queue.exchange == self.b.exchange
- assert queue.routing_key == self.b.oid
- assert not queue.durable
- assert queue.auto_delete
- def test_create_binding(self):
- assert self.b._create_binding('id') == self.b.binding
- def test_on_task_call(self):
- with patch('celery.backends.rpc.maybe_declare') as md:
- with self.app.amqp.producer_pool.acquire() as prod:
- self.b.on_task_call(prod, 'task_id'),
- md.assert_called_with(
- self.b.binding(prod.channel),
- retry=True,
- )
- def test_create_exchange(self):
- ex = self.b._create_exchange('name')
- assert isinstance(ex, self.b.Exchange)
- assert ex.name == ''
|