test_amqp.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from kombu import Exchange, Queue
  4. from mock import Mock
  5. from celery.app.amqp import Queues, TaskPublisher
  6. from celery.tests.utils import AppCase
  7. class test_TaskProducer(AppCase):
  8. def test__exit__(self):
  9. publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
  10. publisher.release = Mock()
  11. with publisher:
  12. pass
  13. publisher.release.assert_called_with()
  14. def test_declare(self):
  15. publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
  16. publisher.exchange.name = 'foo'
  17. publisher.declare()
  18. publisher.exchange.name = None
  19. publisher.declare()
  20. def test_retry_policy(self):
  21. pub = self.app.amqp.TaskProducer(Mock())
  22. pub.channel.connection.client.declared_entities = set()
  23. pub.delay_task('tasks.add', (2, 2), {},
  24. retry_policy={'frobulate': 32.4})
  25. def test_publish_no_retry(self):
  26. pub = self.app.amqp.TaskProducer(Mock())
  27. pub.channel.connection.client.declared_entities = set()
  28. pub.delay_task('tasks.add', (2, 2), {}, retry=False, chord=123)
  29. self.assertFalse(pub.connection.ensure.call_count)
  30. class test_compat_TaskPublisher(AppCase):
  31. def test_compat_exchange_is_string(self):
  32. producer = TaskPublisher(exchange='foo', app=self.app)
  33. self.assertIsInstance(producer.exchange, Exchange)
  34. self.assertEqual(producer.exchange.name, 'foo')
  35. self.assertEqual(producer.exchange.type, 'direct')
  36. producer = TaskPublisher(exchange='foo', exchange_type='topic',
  37. app=self.app)
  38. self.assertEqual(producer.exchange.type, 'topic')
  39. def test_compat_exchange_is_Exchange(self):
  40. producer = TaskPublisher(exchange=Exchange('foo'))
  41. self.assertEqual(producer.exchange.name, 'foo')
  42. class test_PublisherPool(AppCase):
  43. def test_setup_nolimit(self):
  44. L = self.app.conf.BROKER_POOL_LIMIT
  45. self.app.conf.BROKER_POOL_LIMIT = None
  46. try:
  47. delattr(self.app, '_pool')
  48. except AttributeError:
  49. pass
  50. self.app.amqp.__dict__.pop('producer_pool', None)
  51. try:
  52. pool = self.app.amqp.producer_pool
  53. self.assertEqual(pool.limit, self.app.pool.limit)
  54. self.assertFalse(pool._resource.queue)
  55. r1 = pool.acquire()
  56. r2 = pool.acquire()
  57. r1.release()
  58. r2.release()
  59. r1 = pool.acquire()
  60. r2 = pool.acquire()
  61. finally:
  62. self.app.conf.BROKER_POOL_LIMIT = L
  63. def test_setup(self):
  64. L = self.app.conf.BROKER_POOL_LIMIT
  65. self.app.conf.BROKER_POOL_LIMIT = 2
  66. try:
  67. delattr(self.app, '_pool')
  68. except AttributeError:
  69. pass
  70. self.app.amqp.__dict__.pop('producer_pool', None)
  71. try:
  72. pool = self.app.amqp.producer_pool
  73. self.assertEqual(pool.limit, self.app.pool.limit)
  74. self.assertTrue(pool._resource.queue)
  75. p1 = r1 = pool.acquire()
  76. p2 = r2 = pool.acquire()
  77. r1.release()
  78. r2.release()
  79. r1 = pool.acquire()
  80. r2 = pool.acquire()
  81. self.assertIs(p2, r1)
  82. self.assertIs(p1, r2)
  83. r1.release()
  84. r2.release()
  85. finally:
  86. self.app.conf.BROKER_POOL_LIMIT = L
  87. class test_Queues(AppCase):
  88. def test_queues_format(self):
  89. prev, self.app.amqp.queues._consume_from = \
  90. self.app.amqp.queues._consume_from, {}
  91. try:
  92. self.assertEqual(self.app.amqp.queues.format(), '')
  93. finally:
  94. self.app.amqp.queues._consume_from = prev
  95. def test_with_defaults(self):
  96. self.assertEqual(Queues(None), {})
  97. def test_add(self):
  98. q = Queues()
  99. q.add('foo', exchange='ex', routing_key='rk')
  100. self.assertIn('foo', q)
  101. self.assertIsInstance(q['foo'], Queue)
  102. self.assertEqual(q['foo'].routing_key, 'rk')
  103. def test_add_default_exchange(self):
  104. ex = Exchange('fff', 'fanout')
  105. q = Queues(default_exchange=ex)
  106. q.add(Queue('foo'))
  107. self.assertEqual(q['foo'].exchange, ex)
  108. def test_alias(self):
  109. q = Queues()
  110. q.add(Queue('foo', alias='barfoo'))
  111. self.assertIs(q['barfoo'], q['foo'])