test_rpc.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, patch
  4. from celery.backends.rpc import RPCBackend
  5. from celery._state import _task_stack
  6. class test_RPCBackend:
  7. def setup(self):
  8. self.b = RPCBackend(app=self.app)
  9. def test_oid(self):
  10. oid = self.b.oid
  11. oid2 = self.b.oid
  12. assert oid == oid2
  13. assert oid == self.app.oid
  14. def test_interface(self):
  15. self.b.on_reply_declare('task_id')
  16. def test_destination_for(self):
  17. req = Mock(name='request')
  18. req.reply_to = 'reply_to'
  19. req.correlation_id = 'corid'
  20. assert self.b.destination_for('task_id', req) == ('reply_to', 'corid')
  21. task = Mock()
  22. _task_stack.push(task)
  23. try:
  24. task.request.reply_to = 'reply_to'
  25. task.request.correlation_id = 'corid'
  26. assert self.b.destination_for('task_id', None) == (
  27. 'reply_to', 'corid',
  28. )
  29. finally:
  30. _task_stack.pop()
  31. with pytest.raises(RuntimeError):
  32. self.b.destination_for('task_id', None)
  33. def test_binding(self):
  34. queue = self.b.binding
  35. assert queue.name == self.b.oid
  36. assert queue.exchange == self.b.exchange
  37. assert queue.routing_key == self.b.oid
  38. assert not queue.durable
  39. assert queue.auto_delete
  40. def test_create_binding(self):
  41. assert self.b._create_binding('id') == self.b.binding
  42. def test_on_task_call(self):
  43. with patch('celery.backends.rpc.maybe_declare') as md:
  44. with self.app.amqp.producer_pool.acquire() as prod:
  45. self.b.on_task_call(prod, 'task_id'),
  46. md.assert_called_with(
  47. self.b.binding(prod.channel),
  48. retry=True,
  49. )
  50. def test_create_exchange(self):
  51. ex = self.b._create_exchange('name')
  52. assert isinstance(ex, self.b.Exchange)
  53. assert ex.name == ''