test_app_amqp.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from __future__ import with_statement
  2. from mock import Mock
  3. from celery.tests.utils import AppCase
  4. from celery.app.amqp import MSG_OPTIONS, extract_msg_options
  5. class TestMsgOptions(AppCase):
  6. def test_MSG_OPTIONS(self):
  7. self.assertTrue(MSG_OPTIONS)
  8. def test_extract_msg_options(self):
  9. testing = {"mandatory": True, "routing_key": "foo.xuzzy"}
  10. result = extract_msg_options(testing)
  11. self.assertEqual(result["mandatory"], True)
  12. self.assertEqual(result["routing_key"], "foo.xuzzy")
  13. class test_TaskPublisher(AppCase):
  14. def test__exit__(self):
  15. publisher = self.app.amqp.TaskPublisher(self.app.broker_connection())
  16. publisher.close = Mock()
  17. with publisher:
  18. pass
  19. publisher.close.assert_called_with()
  20. def test_ensure_declare_queue(self, q="x1242112"):
  21. publisher = self.app.amqp.TaskPublisher(Mock())
  22. self.app.amqp.queues.add(q, q, q)
  23. publisher._declare_queue(q, retry=True)
  24. self.assertTrue(publisher.connection.ensure.call_count)
  25. def test_ensure_declare_exchange(self, e="x9248311"):
  26. publisher = self.app.amqp.TaskPublisher(Mock())
  27. publisher._declare_exchange(e, "direct", retry=True)
  28. self.assertTrue(publisher.connection.ensure.call_count)
  29. def test_retry_policy(self):
  30. pub = self.app.amqp.TaskPublisher(Mock())
  31. pub.delay_task("tasks.add", (2, 2), {},
  32. retry_policy={"frobulate": 32.4})
  33. def test_publish_no_retry(self):
  34. pub = self.app.amqp.TaskPublisher(Mock())
  35. pub.delay_task("tasks.add", (2, 2), {}, retry=False, chord=123)
  36. self.assertFalse(pub.connection.ensure.call_count)
  37. class test_PublisherPool(AppCase):
  38. def test_setup_nolimit(self):
  39. L = self.app.conf.BROKER_POOL_LIMIT
  40. self.app.conf.BROKER_POOL_LIMIT = None
  41. try:
  42. delattr(self.app, "_pool")
  43. except AttributeError:
  44. pass
  45. self.app.amqp.__dict__.pop("publisher_pool", None)
  46. try:
  47. pool = self.app.amqp.publisher_pool
  48. self.assertEqual(pool.limit, self.app.pool.limit)
  49. self.assertFalse(pool._resource.queue)
  50. r1 = pool.acquire()
  51. r2 = pool.acquire()
  52. r1.release()
  53. r2.release()
  54. r1 = pool.acquire()
  55. r2 = pool.acquire()
  56. finally:
  57. self.app.conf.BROKER_POOL_LIMIT = L
  58. def test_setup(self):
  59. L = self.app.conf.BROKER_POOL_LIMIT
  60. self.app.conf.BROKER_POOL_LIMIT = 2
  61. try:
  62. delattr(self.app, "_pool")
  63. except AttributeError:
  64. pass
  65. self.app.amqp.__dict__.pop("publisher_pool", None)
  66. try:
  67. pool = self.app.amqp.publisher_pool
  68. self.assertEqual(pool.limit, self.app.pool.limit)
  69. self.assertTrue(pool._resource.queue)
  70. p1 = r1 = pool.acquire()
  71. p2 = r2 = pool.acquire()
  72. delattr(r1.connection, "_producer_chan")
  73. r1.release()
  74. r2.release()
  75. r1 = pool.acquire()
  76. r2 = pool.acquire()
  77. self.assertIs(p2, r1)
  78. self.assertIs(p1, r2)
  79. r1.release()
  80. r2.release()
  81. finally:
  82. self.app.conf.BROKER_POOL_LIMIT = L