test_rpc.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import pytest
  2. from case import Mock, patch
  3. from celery import chord, group
  4. from celery.backends.rpc import RPCBackend
  5. from celery._state import _task_stack
  6. class test_RPCBackend:
  7. def setup(self):
  8. self.b = RPCBackend(app=self.app)
  9. def test_oid(self):
  10. oid = self.b.oid
  11. oid2 = self.b.oid
  12. assert oid == oid2
  13. assert oid == self.app.oid
  14. def test_interface(self):
  15. self.b.on_reply_declare('task_id')
  16. def test_ensure_chords_allowed(self):
  17. with pytest.raises(NotImplementedError):
  18. self.b.ensure_chords_allowed()
  19. def test_apply_chord(self):
  20. with pytest.raises(NotImplementedError):
  21. self.b.apply_chord([], (), 'gid', Mock(name='body'))
  22. @pytest.mark.celery(result_backend='rpc')
  23. def test_chord_raises_error(self):
  24. with pytest.raises(NotImplementedError):
  25. chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))
  26. @pytest.mark.celery(result_backend='rpc')
  27. def test_chain_with_chord_raises_error(self):
  28. with pytest.raises(NotImplementedError):
  29. (self.add.s(2, 2) |
  30. group(self.add.s(2, 2),
  31. self.add.s(5, 6)) | self.add.s()).delay()
  32. def test_destination_for(self):
  33. req = Mock(name='request')
  34. req.reply_to = 'reply_to'
  35. req.correlation_id = 'corid'
  36. assert self.b.destination_for('task_id', req) == ('reply_to', 'corid')
  37. task = Mock()
  38. _task_stack.push(task)
  39. try:
  40. task.request.reply_to = 'reply_to'
  41. task.request.correlation_id = 'corid'
  42. assert self.b.destination_for('task_id', None) == (
  43. 'reply_to', 'corid',
  44. )
  45. finally:
  46. _task_stack.pop()
  47. with pytest.raises(RuntimeError):
  48. self.b.destination_for('task_id', None)
  49. def test_binding(self):
  50. queue = self.b.binding
  51. assert queue.name == self.b.oid
  52. assert queue.exchange == self.b.exchange
  53. assert queue.routing_key == self.b.oid
  54. assert not queue.durable
  55. assert queue.auto_delete
  56. def test_create_binding(self):
  57. assert self.b._create_binding('id') == self.b.binding
  58. def test_on_task_call(self):
  59. with patch('celery.backends.rpc.maybe_declare') as md:
  60. with self.app.amqp.producer_pool.acquire() as prod:
  61. self.b.on_task_call(prod, 'task_id'),
  62. md.assert_called_with(
  63. self.b.binding(prod.channel),
  64. retry=True,
  65. )
  66. def test_create_exchange(self):
  67. ex = self.b._create_exchange('name')
  68. assert isinstance(ex, self.b.Exchange)
  69. assert ex.name == ''