test_rpc.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from case import Mock, patch
  4. from celery import chord, group
  5. from celery.backends.rpc import RPCBackend
  6. from celery._state import _task_stack
  7. class test_RPCBackend:
  8. def setup(self):
  9. self.b = RPCBackend(app=self.app)
  10. def test_oid(self):
  11. oid = self.b.oid
  12. oid2 = self.b.oid
  13. assert oid == oid2
  14. assert oid == self.app.oid
  15. def test_interface(self):
  16. self.b.on_reply_declare('task_id')
  17. def test_ensure_chords_allowed(self):
  18. with pytest.raises(NotImplementedError):
  19. self.b.ensure_chords_allowed()
  20. def test_apply_chord(self):
  21. with pytest.raises(NotImplementedError):
  22. self.b.apply_chord([], (), 'gid', Mock(name='body'))
  23. @pytest.mark.celery(result_backend='rpc')
  24. def test_chord_raises_error(self):
  25. with pytest.raises(NotImplementedError):
  26. chord(self.add.s(i, i) for i in range(10))(self.add.s([2]))
  27. @pytest.mark.celery(result_backend='rpc')
  28. def test_chain_with_chord_raises_error(self):
  29. with pytest.raises(NotImplementedError):
  30. (self.add.s(2, 2) |
  31. group(self.add.s(2, 2),
  32. self.add.s(5, 6)) | self.add.s()).delay()
  33. def test_destination_for(self):
  34. req = Mock(name='request')
  35. req.reply_to = 'reply_to'
  36. req.correlation_id = 'corid'
  37. assert self.b.destination_for('task_id', req) == ('reply_to', 'corid')
  38. task = Mock()
  39. _task_stack.push(task)
  40. try:
  41. task.request.reply_to = 'reply_to'
  42. task.request.correlation_id = 'corid'
  43. assert self.b.destination_for('task_id', None) == (
  44. 'reply_to', 'corid',
  45. )
  46. finally:
  47. _task_stack.pop()
  48. with pytest.raises(RuntimeError):
  49. self.b.destination_for('task_id', None)
  50. def test_binding(self):
  51. queue = self.b.binding
  52. assert queue.name == self.b.oid
  53. assert queue.exchange == self.b.exchange
  54. assert queue.routing_key == self.b.oid
  55. assert not queue.durable
  56. assert queue.auto_delete
  57. def test_create_binding(self):
  58. assert self.b._create_binding('id') == self.b.binding
  59. def test_on_task_call(self):
  60. with patch('celery.backends.rpc.maybe_declare') as md:
  61. with self.app.amqp.producer_pool.acquire() as prod:
  62. self.b.on_task_call(prod, 'task_id'),
  63. md.assert_called_with(
  64. self.b.binding(prod.channel),
  65. retry=True,
  66. )
  67. def test_create_exchange(self):
  68. ex = self.b._create_exchange('name')
  69. assert isinstance(ex, self.b.Exchange)
  70. assert ex.name == ''