test_amqp.py 8.5 KB

  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from datetime import datetime, timedelta
  4. from case import Mock
  5. from kombu import Exchange, Queue
  6. from celery import uuid
  7. from celery.app.amqp import Queues, utf8dict
  8. from celery.five import keys
  9. from celery.utils.time import to_utc
  10. class test_TaskConsumer:
  11. def test_accept_content(self, app):
  12. with app.pool.acquire(block=True) as con:
  13. app.conf.accept_content = ['application/json']
  14. assert app.amqp.TaskConsumer(con).accept == {
  15. 'application/json',
  16. }
  17. assert app.amqp.TaskConsumer(con, accept=['json']).accept == {
  18. 'application/json',
  19. }
  20. class test_ProducerPool:
  21. def test_setup_nolimit(self, app):
  22. app.conf.broker_pool_limit = None
  23. try:
  24. delattr(app, '_pool')
  25. except AttributeError:
  26. pass
  27. app.amqp._producer_pool = None
  28. pool = app.amqp.producer_pool
  29. assert pool.limit == app.pool.limit
  30. assert not pool._resource.queue
  31. r1 = pool.acquire()
  32. r2 = pool.acquire()
  33. r1.release()
  34. r2.release()
  35. r1 = pool.acquire()
  36. r2 = pool.acquire()
  37. def test_setup(self, app):
  38. app.conf.broker_pool_limit = 2
  39. try:
  40. delattr(app, '_pool')
  41. except AttributeError:
  42. pass
  43. app.amqp._producer_pool = None
  44. pool = app.amqp.producer_pool
  45. assert pool.limit == app.pool.limit
  46. assert pool._resource.queue
  47. p1 = r1 = pool.acquire()
  48. p2 = r2 = pool.acquire()
  49. r1.release()
  50. r2.release()
  51. r1 = pool.acquire()
  52. r2 = pool.acquire()
  53. assert p2 is r1
  54. assert p1 is r2
  55. r1.release()
  56. r2.release()
  57. class test_Queues:
  58. def test_queues_format(self):
  59. self.app.amqp.queues._consume_from = {}
  60. assert self.app.amqp.queues.format() == ''
  61. def test_with_defaults(self):
  62. assert Queues(None) == {}
  63. def test_add(self):
  64. q = Queues()
  65. q.add('foo', exchange='ex', routing_key='rk')
  66. assert 'foo' in q
  67. assert isinstance(q['foo'], Queue)
  68. assert q['foo'].routing_key == 'rk'
  69. @pytest.mark.parametrize('ha_policy,qname,q,qargs,expected', [
  70. (None, 'xyz', 'xyz', None, None),
  71. (None, 'xyz', 'xyz', {'x-foo': 'bar'}, {'x-foo': 'bar'}),
  72. ('all', 'foo', Queue('foo'), None, {'x-ha-policy': 'all'}),
  73. ('all', 'xyx2',
  74. Queue('xyx2', queue_arguments={'x-foo': 'bari'}),
  75. None,
  76. {'x-ha-policy': 'all', 'x-foo': 'bari'}),
  77. (['A', 'B', 'C'], 'foo', Queue('foo'), None, {
  78. 'x-ha-policy': 'nodes',
  79. 'x-ha-policy-params': ['A', 'B', 'C']}),
  80. ])
  81. def test_with_ha_policy(self, ha_policy, qname, q, qargs, expected):
  82. queues = Queues(ha_policy=ha_policy, create_missing=False)
  83. queues.add(q, queue_arguments=qargs)
  84. assert queues[qname].queue_arguments == expected
  85. def test_select_add(self):
  86. q = Queues()
  87. q.select(['foo', 'bar'])
  88. q.select_add('baz')
  89. assert sorted(keys(q._consume_from)) == ['bar', 'baz', 'foo']
  90. def test_deselect(self):
  91. q = Queues()
  92. q.select(['foo', 'bar'])
  93. q.deselect('bar')
  94. assert sorted(keys(q._consume_from)) == ['foo']
  95. def test_with_ha_policy_compat(self):
  96. q = Queues(ha_policy='all')
  97. q.add('bar')
  98. assert q['bar'].queue_arguments == {'x-ha-policy': 'all'}
  99. def test_add_default_exchange(self):
  100. ex = Exchange('fff', 'fanout')
  101. q = Queues(default_exchange=ex)
  102. q.add(Queue('foo'))
  103. assert q['foo'].exchange.name == ''
  104. def test_alias(self):
  105. q = Queues()
  106. q.add(Queue('foo', alias='barfoo'))
  107. assert q['barfoo'] is q['foo']
  108. @pytest.mark.parametrize('queues_kwargs,qname,q,expected', [
  109. (dict(max_priority=10),
  110. 'foo', 'foo', {'x-max-priority': 10}),
  111. (dict(max_priority=10),
  112. 'xyz', Queue('xyz', queue_arguments={'x-max-priority': 3}),
  113. {'x-max-priority': 3}),
  114. (dict(max_priority=10),
  115. 'moo', Queue('moo', queue_arguments=None),
  116. {'x-max-priority': 10}),
  117. (dict(ha_policy='all', max_priority=5),
  118. 'bar', 'bar',
  119. {'x-ha-policy': 'all', 'x-max-priority': 5}),
  120. (dict(ha_policy='all', max_priority=5),
  121. 'xyx2', Queue('xyx2', queue_arguments={'x-max-priority': 2}),
  122. {'x-ha-policy': 'all', 'x-max-priority': 2}),
  123. (dict(max_priority=None),
  124. 'foo2', 'foo2',
  125. None),
  126. (dict(max_priority=None),
  127. 'xyx3', Queue('xyx3', queue_arguments={'x-max-priority': 7}),
  128. {'x-max-priority': 7}),
  129. ])
  130. def test_with_max_priority(self, queues_kwargs, qname, q, expected):
  131. queues = Queues(**queues_kwargs)
  132. queues.add(q)
  133. assert queues[qname].queue_arguments == expected
  134. class test_AMQP:
  135. def setup(self):
  136. self.simple_message = self.app.amqp.as_task_v2(
  137. uuid(), 'foo', create_sent_event=True,
  138. )
  139. def test_Queues__with_ha_policy(self):
  140. x = self.app.amqp.Queues({}, ha_policy='all')
  141. assert x.ha_policy == 'all'
  142. def test_Queues__with_max_priority(self):
  143. x = self.app.amqp.Queues({}, max_priority=23)
  144. assert x.max_priority == 23
  145. def test_send_task_message__no_kwargs(self):
  146. self.app.amqp.send_task_message(Mock(), 'foo', self.simple_message)
  147. def test_send_task_message__properties(self):
  148. prod = Mock(name='producer')
  149. self.app.amqp.send_task_message(
  150. prod, 'foo', self.simple_message, foo=1, retry=False,
  151. )
  152. assert prod.publish.call_args[1]['foo'] == 1
  153. def test_send_task_message__headers(self):
  154. prod = Mock(name='producer')
  155. self.app.amqp.send_task_message(
  156. prod, 'foo', self.simple_message, headers={'x1x': 'y2x'},
  157. retry=False,
  158. )
  159. assert prod.publish.call_args[1]['headers']['x1x'] == 'y2x'
  160. def test_send_task_message__queue_string(self):
  161. prod = Mock(name='producer')
  162. self.app.amqp.send_task_message(
  163. prod, 'foo', self.simple_message, queue='foo', retry=False,
  164. )
  165. kwargs = prod.publish.call_args[1]
  166. assert kwargs['routing_key'] == 'foo'
  167. assert kwargs['exchange'] == ''
  168. def test_send_event_exchange_string(self):
  169. evd = Mock(name='evd')
  170. self.app.amqp.send_task_message(
  171. Mock(), 'foo', self.simple_message, retry=False,
  172. exchange='xyz', routing_key='xyb',
  173. event_dispatcher=evd,
  174. )
  175. evd.publish.assert_called()
  176. event = evd.publish.call_args[0][1]
  177. assert event['routing_key'] == 'xyb'
  178. assert event['exchange'] == 'xyz'
  179. def test_send_task_message__with_delivery_mode(self):
  180. prod = Mock(name='producer')
  181. self.app.amqp.send_task_message(
  182. prod, 'foo', self.simple_message, delivery_mode=33, retry=False,
  183. )
  184. assert prod.publish.call_args[1]['delivery_mode'] == 33
  185. def test_routes(self):
  186. r1 = self.app.amqp.routes
  187. r2 = self.app.amqp.routes
  188. assert r1 is r2
  189. class test_as_task_v2:
  190. def test_raises_if_args_is_not_tuple(self):
  191. with pytest.raises(TypeError):
  192. self.app.amqp.as_task_v2(uuid(), 'foo', args='123')
  193. def test_raises_if_kwargs_is_not_mapping(self):
  194. with pytest.raises(TypeError):
  195. self.app.amqp.as_task_v2(uuid(), 'foo', kwargs=(1, 2, 3))
  196. def test_countdown_to_eta(self):
  197. now = to_utc(datetime.utcnow()).astimezone(self.app.timezone)
  198. m = self.app.amqp.as_task_v2(
  199. uuid(), 'foo', countdown=10, now=now,
  200. )
  201. assert m.headers['eta'] == (now + timedelta(seconds=10)).isoformat()
  202. def test_expires_to_datetime(self):
  203. now = to_utc(datetime.utcnow()).astimezone(self.app.timezone)
  204. m = self.app.amqp.as_task_v2(
  205. uuid(), 'foo', expires=30, now=now,
  206. )
  207. assert m.headers['expires'] == (
  208. now + timedelta(seconds=30)).isoformat()
  209. def test_callbacks_errbacks_chord(self):
  210. @self.app.task
  211. def t(i):
  212. pass
  213. m = self.app.amqp.as_task_v2(
  214. uuid(), 'foo',
  215. callbacks=[t.s(1), t.s(2)],
  216. errbacks=[t.s(3), t.s(4)],
  217. chord=t.s(5),
  218. )
  219. _, _, embed = m.body
  220. assert embed['callbacks'] == [utf8dict(t.s(1)), utf8dict(t.s(2))]
  221. assert embed['errbacks'] == [utf8dict(t.s(3)), utf8dict(t.s(4))]
  222. assert embed['chord'] == utf8dict(t.s(5))