test_amqp.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from __future__ import absolute_import
  2. from kombu import Exchange, Queue
  3. from mock import Mock
  4. from celery.app.amqp import Queues, TaskPublisher
  5. from celery.tests.utils import AppCase
  6. class test_TaskProducer(AppCase):
  7. def test__exit__(self):
  8. publisher = self.app.amqp.TaskProducer(self.app.connection())
  9. publisher.release = Mock()
  10. with publisher:
  11. pass
  12. publisher.release.assert_called_with()
  13. def test_declare(self):
  14. publisher = self.app.amqp.TaskProducer(self.app.connection())
  15. publisher.exchange.name = 'foo'
  16. publisher.declare()
  17. publisher.exchange.name = None
  18. publisher.declare()
  19. def test_retry_policy(self):
  20. prod = self.app.amqp.TaskProducer(Mock())
  21. prod.channel.connection.client.declared_entities = set()
  22. prod.publish_task('tasks.add', (2, 2), {},
  23. retry_policy={'frobulate': 32.4})
  24. def test_publish_no_retry(self):
  25. prod = self.app.amqp.TaskProducer(Mock())
  26. prod.channel.connection.client.declared_entities = set()
  27. prod.publish_task('tasks.add', (2, 2), {}, retry=False, chord=123)
  28. self.assertFalse(prod.connection.ensure.call_count)
  29. class test_compat_TaskPublisher(AppCase):
  30. def test_compat_exchange_is_string(self):
  31. producer = TaskPublisher(exchange='foo', app=self.app)
  32. self.assertIsInstance(producer.exchange, Exchange)
  33. self.assertEqual(producer.exchange.name, 'foo')
  34. self.assertEqual(producer.exchange.type, 'direct')
  35. producer = TaskPublisher(exchange='foo', exchange_type='topic',
  36. app=self.app)
  37. self.assertEqual(producer.exchange.type, 'topic')
  38. def test_compat_exchange_is_Exchange(self):
  39. producer = TaskPublisher(exchange=Exchange('foo'))
  40. self.assertEqual(producer.exchange.name, 'foo')
  41. class test_PublisherPool(AppCase):
  42. def test_setup_nolimit(self):
  43. L = self.app.conf.BROKER_POOL_LIMIT
  44. self.app.conf.BROKER_POOL_LIMIT = None
  45. try:
  46. delattr(self.app, '_pool')
  47. except AttributeError:
  48. pass
  49. self.app.amqp._producer_pool = None
  50. try:
  51. pool = self.app.amqp.producer_pool
  52. self.assertEqual(pool.limit, self.app.pool.limit)
  53. self.assertFalse(pool._resource.queue)
  54. r1 = pool.acquire()
  55. r2 = pool.acquire()
  56. r1.release()
  57. r2.release()
  58. r1 = pool.acquire()
  59. r2 = pool.acquire()
  60. finally:
  61. self.app.conf.BROKER_POOL_LIMIT = L
  62. def test_setup(self):
  63. L = self.app.conf.BROKER_POOL_LIMIT
  64. self.app.conf.BROKER_POOL_LIMIT = 2
  65. try:
  66. delattr(self.app, '_pool')
  67. except AttributeError:
  68. pass
  69. self.app.amqp._producer_pool = None
  70. try:
  71. pool = self.app.amqp.producer_pool
  72. self.assertEqual(pool.limit, self.app.pool.limit)
  73. self.assertTrue(pool._resource.queue)
  74. p1 = r1 = pool.acquire()
  75. p2 = r2 = pool.acquire()
  76. r1.release()
  77. r2.release()
  78. r1 = pool.acquire()
  79. r2 = pool.acquire()
  80. self.assertIs(p2, r1)
  81. self.assertIs(p1, r2)
  82. r1.release()
  83. r2.release()
  84. finally:
  85. self.app.conf.BROKER_POOL_LIMIT = L
  86. class test_Queues(AppCase):
  87. def test_queues_format(self):
  88. prev, self.app.amqp.queues._consume_from = \
  89. self.app.amqp.queues._consume_from, {}
  90. try:
  91. self.assertEqual(self.app.amqp.queues.format(), '')
  92. finally:
  93. self.app.amqp.queues._consume_from = prev
  94. def test_with_defaults(self):
  95. self.assertEqual(Queues(None), {})
  96. def test_add(self):
  97. q = Queues()
  98. q.add('foo', exchange='ex', routing_key='rk')
  99. self.assertIn('foo', q)
  100. self.assertIsInstance(q['foo'], Queue)
  101. self.assertEqual(q['foo'].routing_key, 'rk')
  102. def test_add_default_exchange(self):
  103. ex = Exchange('fff', 'fanout')
  104. q = Queues(default_exchange=ex)
  105. q.add(Queue('foo'))
  106. self.assertEqual(q['foo'].exchange, ex)
  107. def test_alias(self):
  108. q = Queues()
  109. q.add(Queue('foo', alias='barfoo'))
  110. self.assertIs(q['barfoo'], q['foo'])