test_rpc.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from __future__ import absolute_import
  2. from mock import patch
  3. from celery.backends.rpc import RPCBackend
  4. from celery._state import _task_stack
  5. from celery.tests.utils import AppCase, Mock
  6. class test_RPCBackend(AppCase):
  7. def setup(self):
  8. self.b = RPCBackend(app=self.app)
  9. def test_oid(self):
  10. oid = self.b.oid
  11. oid2 = self.b.oid
  12. self.assertEqual(oid, oid2)
  13. def test_interface(self):
  14. self.b.on_reply_declare('task_id')
  15. def test_current_routing_key(self):
  16. task = Mock()
  17. _task_stack.push(task)
  18. try:
  19. task.request.reply_to = 'reply_to'
  20. self.assertEqual(self.b._routing_key('task_id'), 'reply_to')
  21. finally:
  22. _task_stack.pop()
  23. def test_binding(self):
  24. queue = self.b.binding
  25. self.assertEqual(queue.name, self.b.oid)
  26. self.assertEqual(queue.exchange, self.b.exchange)
  27. self.assertEqual(queue.routing_key, self.b.oid)
  28. self.assertFalse(queue.durable)
  29. self.assertFalse(queue.auto_delete)
  30. def test_many_bindings(self):
  31. self.assertListEqual(
  32. self.b._many_bindings(['a', 'b']),
  33. [self.b.binding],
  34. )
  35. def test_create_binding(self):
  36. self.assertEqual(self.b._create_binding('id'), self.b.binding)
  37. def test_extra_properties(self):
  38. self.assertDictEqual(
  39. self.b.extra_properties,
  40. {'reply_to': self.b.oid},
  41. )
  42. def test_on_task_call(self):
  43. with patch('celery.backends.rpc.maybe_declare') as md:
  44. with self.app.amqp.producer_pool.acquire() as prod:
  45. self.assertEqual(
  46. self.b.on_task_call(prod, 'task_id'),
  47. self.b.extra_properties,
  48. )
  49. md.assert_called_with(
  50. self.b.binding(prod.channel),
  51. retry=True,
  52. )
  53. def test_create_exchange(self):
  54. ex = self.b._create_exchange('name')
  55. self.assertIsInstance(ex, self.b.Exchange)
  56. self.assertEqual(ex.name, 'c.rep')
  57. self.assertEqual(ex.type, 'direct')
  58. self.assertEqual(ex.delivery_mode, 1)
  59. self.assertFalse(ex.durable)
  60. self.assertFalse(ex.auto_delete)