test_rpc.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import pytest
  2. from case import Mock, patch
  3. from celery.backends.rpc import RPCBackend
  4. from celery._state import _task_stack
  5. class test_RPCBackend:
  6. def setup(self):
  7. self.b = RPCBackend(app=self.app)
  8. def test_oid(self):
  9. oid = self.b.oid
  10. oid2 = self.b.oid
  11. assert oid == oid2
  12. assert oid == self.app.oid
  13. def test_interface(self):
  14. self.b.on_reply_declare('task_id')
  15. def test_destination_for(self):
  16. req = Mock(name='request')
  17. req.reply_to = 'reply_to'
  18. req.correlation_id = 'corid'
  19. assert self.b.destination_for('task_id', req) == ('reply_to', 'corid')
  20. task = Mock()
  21. _task_stack.push(task)
  22. try:
  23. task.request.reply_to = 'reply_to'
  24. task.request.correlation_id = 'corid'
  25. assert self.b.destination_for('task_id', None) == (
  26. 'reply_to', 'corid',
  27. )
  28. finally:
  29. _task_stack.pop()
  30. with pytest.raises(RuntimeError):
  31. self.b.destination_for('task_id', None)
  32. def test_binding(self):
  33. queue = self.b.binding
  34. assert queue.name == self.b.oid
  35. assert queue.exchange == self.b.exchange
  36. assert queue.routing_key == self.b.oid
  37. assert not queue.durable
  38. assert queue.auto_delete
  39. def test_create_binding(self):
  40. assert self.b._create_binding('id') == self.b.binding
  41. def test_on_task_call(self):
  42. with patch('celery.backends.rpc.maybe_declare') as md:
  43. with self.app.amqp.producer_pool.acquire() as prod:
  44. self.b.on_task_call(prod, 'task_id'),
  45. md.assert_called_with(
  46. self.b.binding(prod.channel),
  47. retry=True,
  48. )
  49. def test_create_exchange(self):
  50. ex = self.b._create_exchange('name')
  51. assert isinstance(ex, self.b.Exchange)
  52. assert ex.name == ''