test_rpc.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from __future__ import absolute_import
  2. from celery.backends.rpc import RPCBackend
  3. from celery._state import _task_stack
  4. from celery.tests.case import AppCase, Mock, patch
  5. class test_RPCBackend(AppCase):
  6. def setup(self):
  7. self.b = RPCBackend(app=self.app)
  8. def test_oid(self):
  9. oid = self.b.oid
  10. oid2 = self.b.oid
  11. self.assertEqual(oid, oid2)
  12. self.assertEqual(oid, self.app.oid)
  13. def test_interface(self):
  14. self.b.on_reply_declare('task_id')
  15. def test_destination_for(self):
  16. req = Mock(name='request')
  17. req.reply_to = 'reply_to'
  18. req.correlation_id = 'corid'
  19. self.assertTupleEqual(
  20. self.b.destination_for('task_id', req),
  21. ('reply_to', 'corid'),
  22. )
  23. task = Mock()
  24. _task_stack.push(task)
  25. try:
  26. task.request.reply_to = 'reply_to'
  27. task.request.correlation_id = 'corid'
  28. self.assertTupleEqual(
  29. self.b.destination_for('task_id', None),
  30. ('reply_to', 'corid'),
  31. )
  32. finally:
  33. _task_stack.pop()
  34. with self.assertRaises(RuntimeError):
  35. self.b.destination_for('task_id', None)
  36. def test_rkey(self):
  37. self.assertEqual(self.b.rkey('id1'), 'id1')
  38. def test_binding(self):
  39. queue = self.b.binding
  40. self.assertEqual(queue.name, self.b.oid)
  41. self.assertEqual(queue.exchange, self.b.exchange)
  42. self.assertEqual(queue.routing_key, self.b.oid)
  43. self.assertFalse(queue.durable)
  44. self.assertTrue(queue.auto_delete)
  45. def test_many_bindings(self):
  46. self.assertListEqual(
  47. self.b._many_bindings(['a', 'b']),
  48. [self.b.binding],
  49. )
  50. def test_create_binding(self):
  51. self.assertEqual(self.b._create_binding('id'), self.b.binding)
  52. def test_on_task_call(self):
  53. with patch('celery.backends.rpc.maybe_declare') as md:
  54. with self.app.amqp.producer_pool.acquire() as prod:
  55. self.b.on_task_call(prod, 'task_id'),
  56. md.assert_called_with(
  57. self.b.binding(prod.channel),
  58. retry=True,
  59. )
  60. def test_create_exchange(self):
  61. ex = self.b._create_exchange('name')
  62. self.assertIsInstance(ex, self.b.Exchange)
  63. self.assertEqual(ex.name, '')