test_migrate.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. from __future__ import absolute_import, unicode_literals
  2. import pytest
  3. from contextlib import contextmanager
  4. from amqp import ChannelError
  5. from case import Mock, mock, patch
  6. from kombu import Connection, Producer, Queue, Exchange
  7. from kombu.transport.virtual import QoS
  8. from celery.contrib.migrate import (
  9. StopFiltering,
  10. State,
  11. migrate_task,
  12. migrate_tasks,
  13. filter_callback,
  14. _maybe_queue,
  15. filter_status,
  16. move_by_taskmap,
  17. move_by_idmap,
  18. move_task_by_id,
  19. start_filter,
  20. task_id_in,
  21. task_id_eq,
  22. expand_dest,
  23. move,
  24. )
  25. from celery.utils.encoding import bytes_t, ensure_bytes
  26. # hack to ignore error at shutdown
  27. QoS.restore_at_shutdown = False
  28. def Message(body, exchange='exchange', routing_key='rkey',
  29. compression=None, content_type='application/json',
  30. content_encoding='utf-8'):
  31. return Mock(
  32. attrs={
  33. 'body': body,
  34. 'delivery_info': {
  35. 'exchange': exchange,
  36. 'routing_key': routing_key,
  37. },
  38. 'headers': {
  39. 'compression': compression,
  40. },
  41. 'content_type': content_type,
  42. 'content_encoding': content_encoding,
  43. 'properties': {}
  44. },
  45. )
  46. class test_State:
  47. def test_strtotal(self):
  48. x = State()
  49. assert x.strtotal == '?'
  50. x.total_apx = 100
  51. assert x.strtotal == '100'
  52. def test_repr(self):
  53. x = State()
  54. assert repr(x)
  55. x.filtered = 'foo'
  56. assert repr(x)
  57. class test_move:
  58. @contextmanager
  59. def move_context(self, **kwargs):
  60. with patch('celery.contrib.migrate.start_filter') as start:
  61. with patch('celery.contrib.migrate.republish') as republish:
  62. pred = Mock(name='predicate')
  63. move(pred, app=self.app,
  64. connection=self.app.connection(), **kwargs)
  65. start.assert_called()
  66. callback = start.call_args[0][2]
  67. yield callback, pred, republish
  68. def msgpair(self, **kwargs):
  69. body = dict({'task': 'add', 'id': 'id'}, **kwargs)
  70. return body, Message(body)
  71. def test_move(self):
  72. with self.move_context() as (callback, pred, republish):
  73. pred.return_value = None
  74. body, message = self.msgpair()
  75. callback(body, message)
  76. message.ack.assert_not_called()
  77. republish.assert_not_called()
  78. pred.return_value = 'foo'
  79. callback(body, message)
  80. message.ack.assert_called_with()
  81. republish.assert_called()
  82. def test_move_transform(self):
  83. trans = Mock(name='transform')
  84. trans.return_value = Queue('bar')
  85. with self.move_context(transform=trans) as (callback, pred, republish):
  86. pred.return_value = 'foo'
  87. body, message = self.msgpair()
  88. with patch('celery.contrib.migrate.maybe_declare') as maybed:
  89. callback(body, message)
  90. trans.assert_called_with('foo')
  91. maybed.assert_called()
  92. republish.assert_called()
  93. def test_limit(self):
  94. with self.move_context(limit=1) as (callback, pred, republish):
  95. pred.return_value = 'foo'
  96. body, message = self.msgpair()
  97. with pytest.raises(StopFiltering):
  98. callback(body, message)
  99. republish.assert_called()
  100. def test_callback(self):
  101. cb = Mock()
  102. with self.move_context(callback=cb) as (callback, pred, republish):
  103. pred.return_value = 'foo'
  104. body, message = self.msgpair()
  105. callback(body, message)
  106. republish.assert_called()
  107. cb.assert_called()
  108. class test_start_filter:
  109. def test_start(self):
  110. with patch('celery.contrib.migrate.eventloop') as evloop:
  111. app = Mock()
  112. filt = Mock(name='filter')
  113. conn = Connection('memory://')
  114. evloop.side_effect = StopFiltering()
  115. app.amqp.queues = {'foo': Queue('foo'), 'bar': Queue('bar')}
  116. consumer = app.amqp.TaskConsumer.return_value = Mock(name='consum')
  117. consumer.queues = list(app.amqp.queues.values())
  118. consumer.channel = conn.default_channel
  119. consumer.__enter__ = Mock(name='consumer.__enter__')
  120. consumer.__exit__ = Mock(name='consumer.__exit__')
  121. consumer.callbacks = []
  122. def register_callback(x):
  123. consumer.callbacks.append(x)
  124. consumer.register_callback = register_callback
  125. start_filter(app, conn, filt,
  126. queues='foo,bar', ack_messages=True)
  127. body = {'task': 'add', 'id': 'id'}
  128. for callback in consumer.callbacks:
  129. callback(body, Message(body))
  130. consumer.callbacks[:] = []
  131. cb = Mock(name='callback=')
  132. start_filter(app, conn, filt, tasks='add,mul', callback=cb)
  133. for callback in consumer.callbacks:
  134. callback(body, Message(body))
  135. cb.assert_called()
  136. on_declare_queue = Mock()
  137. start_filter(app, conn, filt, tasks='add,mul', queues='foo',
  138. on_declare_queue=on_declare_queue)
  139. on_declare_queue.assert_called()
  140. start_filter(app, conn, filt, queues=['foo', 'bar'])
  141. consumer.callbacks[:] = []
  142. state = State()
  143. start_filter(app, conn, filt,
  144. tasks='add,mul', callback=cb, state=state, limit=1)
  145. stop_filtering_raised = False
  146. for callback in consumer.callbacks:
  147. try:
  148. callback(body, Message(body))
  149. except StopFiltering:
  150. stop_filtering_raised = True
  151. assert state.count
  152. assert stop_filtering_raised
  153. class test_filter_callback:
  154. def test_filter(self):
  155. callback = Mock()
  156. filt = filter_callback(callback, ['add', 'mul'])
  157. t1 = {'task': 'add'}
  158. t2 = {'task': 'div'}
  159. message = Mock()
  160. filt(t2, message)
  161. callback.assert_not_called()
  162. filt(t1, message)
  163. callback.assert_called_with(t1, message)
  164. def test_task_id_in():
  165. assert task_id_in(['A'], {'id': 'A'}, Mock())
  166. assert not task_id_in(['A'], {'id': 'B'}, Mock())
  167. def test_task_id_eq():
  168. assert task_id_eq('A', {'id': 'A'}, Mock())
  169. assert not task_id_eq('A', {'id': 'B'}, Mock())
  170. def test_expand_dest():
  171. assert expand_dest(None, 'foo', 'bar') == ('foo', 'bar')
  172. assert expand_dest(('b', 'x'), 'foo', 'bar') == ('b', 'x')
  173. def test_maybe_queue():
  174. app = Mock()
  175. app.amqp.queues = {'foo': 313}
  176. assert _maybe_queue(app, 'foo') == 313
  177. assert _maybe_queue(app, Queue('foo')) == Queue('foo')
  178. def test_filter_status():
  179. with mock.stdouts() as (stdout, stderr):
  180. filter_status(State(), {'id': '1', 'task': 'add'}, Mock())
  181. assert stdout.getvalue()
  182. def test_move_by_taskmap():
  183. with patch('celery.contrib.migrate.move') as move:
  184. move_by_taskmap({'add': Queue('foo')})
  185. move.assert_called()
  186. cb = move.call_args[0][0]
  187. assert cb({'task': 'add'}, Mock())
  188. def test_move_by_idmap():
  189. with patch('celery.contrib.migrate.move') as move:
  190. move_by_idmap({'123f': Queue('foo')})
  191. move.assert_called()
  192. cb = move.call_args[0][0]
  193. assert cb({'id': '123f'}, Mock())
  194. def test_move_task_by_id():
  195. with patch('celery.contrib.migrate.move') as move:
  196. move_task_by_id('123f', Queue('foo'))
  197. move.assert_called()
  198. cb = move.call_args[0][0]
  199. assert cb({'id': '123f'}, Mock()) == Queue('foo')
  200. class test_migrate_task:
  201. def test_removes_compression_header(self):
  202. x = Message('foo', compression='zlib')
  203. producer = Mock()
  204. migrate_task(producer, x.body, x)
  205. producer.publish.assert_called()
  206. args, kwargs = producer.publish.call_args
  207. assert isinstance(args[0], bytes_t)
  208. assert 'compression' not in kwargs['headers']
  209. assert kwargs['compression'] == 'zlib'
  210. assert kwargs['content_type'] == 'application/json'
  211. assert kwargs['content_encoding'] == 'utf-8'
  212. assert kwargs['exchange'] == 'exchange'
  213. assert kwargs['routing_key'] == 'rkey'
  214. class test_migrate_tasks:
  215. def test_migrate(self, app, name='testcelery'):
  216. connection_kwargs = dict(
  217. transport_options={'polling_interval': 0.01}
  218. )
  219. x = Connection('memory://foo', **connection_kwargs)
  220. y = Connection('memory://foo', **connection_kwargs)
  221. # use separate state
  222. x.default_channel.queues = {}
  223. y.default_channel.queues = {}
  224. ex = Exchange(name, 'direct')
  225. q = Queue(name, exchange=ex, routing_key=name)
  226. q(x.default_channel).declare()
  227. Producer(x).publish('foo', exchange=name, routing_key=name)
  228. Producer(x).publish('bar', exchange=name, routing_key=name)
  229. Producer(x).publish('baz', exchange=name, routing_key=name)
  230. assert x.default_channel.queues
  231. assert not y.default_channel.queues
  232. migrate_tasks(x, y, accept=['text/plain'], app=app)
  233. yq = q(y.default_channel)
  234. assert yq.get().body == ensure_bytes('foo')
  235. assert yq.get().body == ensure_bytes('bar')
  236. assert yq.get().body == ensure_bytes('baz')
  237. Producer(x).publish('foo', exchange=name, routing_key=name)
  238. callback = Mock()
  239. migrate_tasks(x, y,
  240. callback=callback, accept=['text/plain'], app=app)
  241. callback.assert_called()
  242. migrate = Mock()
  243. Producer(x).publish('baz', exchange=name, routing_key=name)
  244. migrate_tasks(x, y, callback=callback,
  245. migrate=migrate, accept=['text/plain'], app=app)
  246. migrate.assert_called()
  247. with patch('kombu.transport.virtual.Channel.queue_declare') as qd:
  248. def effect(*args, **kwargs):
  249. if kwargs.get('passive'):
  250. raise ChannelError('some channel error')
  251. return 0, 3, 0
  252. qd.side_effect = effect
  253. migrate_tasks(x, y, app=app)
  254. x = Connection('memory://', **connection_kwargs)
  255. x.default_channel.queues = {}
  256. y.default_channel.queues = {}
  257. callback = Mock()
  258. migrate_tasks(x, y,
  259. callback=callback, accept=['text/plain'], app=app)
  260. callback.assert_not_called()