123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- from __future__ import absolute_import
- from celery.backends.rpc import RPCBackend
- from celery._state import _task_stack
- from celery.tests.case import AppCase, Mock, patch
- class test_RPCBackend(AppCase):
- def setup(self):
- self.b = RPCBackend(app=self.app)
- def test_oid(self):
- oid = self.b.oid
- oid2 = self.b.oid
- self.assertEqual(oid, oid2)
- self.assertEqual(oid, self.app.oid)
- def test_interface(self):
- self.b.on_reply_declare('task_id')
- def test_destination_for(self):
- req = Mock(name='request')
- req.reply_to = 'reply_to'
- req.correlation_id = 'corid'
- self.assertTupleEqual(
- self.b.destination_for('task_id', req),
- ('reply_to', 'corid'),
- )
- task = Mock()
- _task_stack.push(task)
- try:
- task.request.reply_to = 'reply_to'
- task.request.correlation_id = 'corid'
- self.assertTupleEqual(
- self.b.destination_for('task_id', None),
- ('reply_to', 'corid'),
- )
- finally:
- _task_stack.pop()
- with self.assertRaises(RuntimeError):
- self.b.destination_for('task_id', None)
- def test_rkey(self):
- self.assertEqual(self.b.rkey('id1'), 'id1')
- def test_binding(self):
- queue = self.b.binding
- self.assertEqual(queue.name, self.b.oid)
- self.assertEqual(queue.exchange, self.b.exchange)
- self.assertEqual(queue.routing_key, self.b.oid)
- self.assertFalse(queue.durable)
- self.assertTrue(queue.auto_delete)
- def test_many_bindings(self):
- self.assertListEqual(
- self.b._many_bindings(['a', 'b']),
- [self.b.binding],
- )
- def test_create_binding(self):
- self.assertEqual(self.b._create_binding('id'), self.b.binding)
- def test_on_task_call(self):
- with patch('celery.backends.rpc.maybe_declare') as md:
- with self.app.amqp.producer_pool.acquire() as prod:
- self.b.on_task_call(prod, 'task_id'),
- md.assert_called_with(
- self.b.binding(prod.channel),
- retry=True,
- )
- def test_create_exchange(self):
- ex = self.b._create_exchange('name')
- self.assertIsInstance(ex, self.b.Exchange)
- self.assertEqual(ex.name, '')
|