test_consumer.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. from __future__ import absolute_import
  2. import errno
  3. import socket
  4. from billiard.exceptions import RestartFreqExceeded
  5. from celery.datastructures import LimitedSet
  6. from celery.worker import state as worker_state
  7. from celery.worker.consumer import (
  8. Consumer,
  9. Heart,
  10. Tasks,
  11. Agent,
  12. Mingle,
  13. Gossip,
  14. dump_body,
  15. CLOSE,
  16. )
  17. from celery.tests.case import AppCase, ContextMock, Mock, SkipTest, call, patch
  18. class test_Consumer(AppCase):
  19. def get_consumer(self, no_hub=False, **kwargs):
  20. consumer = Consumer(
  21. on_task_request=Mock(),
  22. init_callback=Mock(),
  23. pool=Mock(),
  24. app=self.app,
  25. timer=Mock(),
  26. controller=Mock(),
  27. hub=None if no_hub else Mock(),
  28. **kwargs
  29. )
  30. consumer.blueprint = Mock()
  31. consumer._restart_state = Mock()
  32. consumer.connection = _amqp_connection()
  33. consumer.connection_errors = (socket.error, OSError, )
  34. return consumer
  35. def test_taskbuckets_defaultdict(self):
  36. c = self.get_consumer()
  37. self.assertIsNone(c.task_buckets['fooxasdwx.wewe'])
  38. def test_dump_body_buffer(self):
  39. msg = Mock()
  40. msg.body = 'str'
  41. try:
  42. buf = buffer(msg.body)
  43. except NameError:
  44. raise SkipTest('buffer type not available')
  45. self.assertTrue(dump_body(msg, buf))
  46. def test_sets_heartbeat(self):
  47. c = self.get_consumer(amqheartbeat=10)
  48. self.assertEqual(c.amqheartbeat, 10)
  49. self.app.conf.BROKER_HEARTBEAT = 20
  50. c = self.get_consumer(amqheartbeat=None)
  51. self.assertEqual(c.amqheartbeat, 20)
  52. def test_gevent_bug_disables_connection_timeout(self):
  53. with patch('celery.worker.consumer._detect_environment') as de:
  54. de.return_value = 'gevent'
  55. self.app.conf.BROKER_CONNECTION_TIMEOUT = 33.33
  56. self.get_consumer()
  57. self.assertIsNone(self.app.conf.BROKER_CONNECTION_TIMEOUT)
  58. def test_limit_task(self):
  59. c = self.get_consumer()
  60. with patch('celery.worker.consumer.task_reserved') as reserved:
  61. bucket = Mock()
  62. request = Mock()
  63. bucket.can_consume.return_value = True
  64. c._limit_task(request, bucket, 3)
  65. bucket.can_consume.assert_called_with(3)
  66. reserved.assert_called_with(request)
  67. c.on_task_request.assert_called_with(request)
  68. with patch('celery.worker.consumer.task_reserved') as reserved:
  69. bucket.can_consume.return_value = False
  70. bucket.expected_time.return_value = 3.33
  71. c._limit_task(request, bucket, 4)
  72. bucket.can_consume.assert_called_with(4)
  73. c.timer.call_after.assert_called_with(
  74. 3.33, c._limit_task, (request, bucket, 4),
  75. )
  76. bucket.expected_time.assert_called_with(4)
  77. self.assertFalse(reserved.called)
  78. def test_start_blueprint_raises_EMFILE(self):
  79. c = self.get_consumer()
  80. exc = c.blueprint.start.side_effect = OSError()
  81. exc.errno = errno.EMFILE
  82. with self.assertRaises(OSError):
  83. c.start()
  84. def test_max_restarts_exceeded(self):
  85. c = self.get_consumer()
  86. def se(*args, **kwargs):
  87. c.blueprint.state = CLOSE
  88. raise RestartFreqExceeded()
  89. c._restart_state.step.side_effect = se
  90. c.blueprint.start.side_effect = socket.error()
  91. with patch('celery.worker.consumer.sleep') as sleep:
  92. c.start()
  93. sleep.assert_called_with(1)
  94. def _closer(self, c):
  95. def se(*args, **kwargs):
  96. c.blueprint.state = CLOSE
  97. return se
  98. def test_collects_at_restart(self):
  99. c = self.get_consumer()
  100. c.connection.collect.side_effect = MemoryError()
  101. c.blueprint.start.side_effect = socket.error()
  102. c.blueprint.restart.side_effect = self._closer(c)
  103. c.start()
  104. c.connection.collect.assert_called_with()
  105. def test_register_with_event_loop(self):
  106. c = self.get_consumer()
  107. c.register_with_event_loop(Mock(name='loop'))
  108. def test_on_close_clears_semaphore_timer_and_reqs(self):
  109. with patch('celery.worker.consumer.reserved_requests') as reserved:
  110. c = self.get_consumer()
  111. c.on_close()
  112. c.controller.semaphore.clear.assert_called_with()
  113. c.timer.clear.assert_called_with()
  114. reserved.clear.assert_called_with()
  115. c.pool.flush.assert_called_with()
  116. c.controller = None
  117. c.timer = None
  118. c.pool = None
  119. c.on_close()
  120. def test_connect_error_handler(self):
  121. self.app.connection = _amqp_connection()
  122. conn = self.app.connection.return_value
  123. c = self.get_consumer()
  124. self.assertTrue(c.connect())
  125. self.assertTrue(conn.ensure_connection.called)
  126. errback = conn.ensure_connection.call_args[0][0]
  127. conn.alt = [(1, 2, 3)]
  128. errback(Mock(), 0)
  129. class test_Heart(AppCase):
  130. def test_start(self):
  131. c = Mock()
  132. c.timer = Mock()
  133. c.event_dispatcher = Mock()
  134. with patch('celery.worker.heartbeat.Heart') as hcls:
  135. h = Heart(c)
  136. self.assertTrue(h.enabled)
  137. self.assertEqual(h.heartbeat_interval, None)
  138. self.assertIsNone(c.heart)
  139. h.start(c)
  140. self.assertTrue(c.heart)
  141. hcls.assert_called_with(c.timer, c.event_dispatcher,
  142. h.heartbeat_interval)
  143. c.heart.start.assert_called_with()
  144. def test_start_heartbeat_interval(self):
  145. c = Mock()
  146. c.timer = Mock()
  147. c.event_dispatcher = Mock()
  148. with patch('celery.worker.heartbeat.Heart') as hcls:
  149. h = Heart(c, False, 20)
  150. self.assertTrue(h.enabled)
  151. self.assertEqual(h.heartbeat_interval, 20)
  152. self.assertIsNone(c.heart)
  153. h.start(c)
  154. self.assertTrue(c.heart)
  155. hcls.assert_called_with(c.timer, c.event_dispatcher,
  156. h.heartbeat_interval)
  157. c.heart.start.assert_called_with()
  158. class test_Tasks(AppCase):
  159. def test_stop(self):
  160. c = Mock()
  161. tasks = Tasks(c)
  162. self.assertIsNone(c.task_consumer)
  163. self.assertIsNone(c.qos)
  164. c.task_consumer = Mock()
  165. tasks.stop(c)
  166. def test_stop_already_stopped(self):
  167. c = Mock()
  168. tasks = Tasks(c)
  169. tasks.stop(c)
  170. class test_Agent(AppCase):
  171. def test_start(self):
  172. c = Mock()
  173. agent = Agent(c)
  174. agent.instantiate = Mock()
  175. agent.agent_cls = 'foo:Agent'
  176. self.assertIsNotNone(agent.create(c))
  177. agent.instantiate.assert_called_with(agent.agent_cls, c.connection)
  178. class test_Mingle(AppCase):
  179. def test_start_no_replies(self):
  180. c = Mock()
  181. c.app.connection = _amqp_connection()
  182. mingle = Mingle(c)
  183. I = c.app.control.inspect.return_value = Mock()
  184. I.hello.return_value = {}
  185. mingle.start(c)
  186. def test_start(self):
  187. try:
  188. c = Mock()
  189. c.app.connection = _amqp_connection()
  190. mingle = Mingle(c)
  191. self.assertTrue(mingle.enabled)
  192. Aig = LimitedSet()
  193. Big = LimitedSet()
  194. Aig.add('Aig-1')
  195. Aig.add('Aig-2')
  196. Big.add('Big-1')
  197. I = c.app.control.inspect.return_value = Mock()
  198. I.hello.return_value = {
  199. 'A@example.com': {
  200. 'clock': 312,
  201. 'revoked': Aig._data,
  202. },
  203. 'B@example.com': {
  204. 'clock': 29,
  205. 'revoked': Big._data,
  206. },
  207. 'C@example.com': {
  208. 'error': 'unknown method',
  209. },
  210. }
  211. mingle.start(c)
  212. I.hello.assert_called_with(c.hostname, worker_state.revoked._data)
  213. c.app.clock.adjust.assert_has_calls([
  214. call(312), call(29),
  215. ], any_order=True)
  216. self.assertIn('Aig-1', worker_state.revoked)
  217. self.assertIn('Aig-2', worker_state.revoked)
  218. self.assertIn('Big-1', worker_state.revoked)
  219. finally:
  220. worker_state.revoked.clear()
  221. def _amqp_connection():
  222. connection = ContextMock()
  223. connection.return_value = ContextMock()
  224. connection.return_value.transport.driver_type = 'amqp'
  225. return connection
  226. class test_Gossip(AppCase):
  227. def test_init(self):
  228. c = self.Consumer()
  229. c.app.connection = _amqp_connection()
  230. g = Gossip(c)
  231. self.assertTrue(g.enabled)
  232. self.assertIs(c.gossip, g)
  233. def test_election(self):
  234. c = self.Consumer()
  235. c.app.connection = _amqp_connection()
  236. g = Gossip(c)
  237. g.start(c)
  238. g.election('id', 'topic', 'action')
  239. self.assertListEqual(g.consensus_replies['id'], [])
  240. g.dispatcher.send.assert_called_with(
  241. 'worker-elect', id='id', topic='topic', cver=1, action='action',
  242. )
  243. def test_call_task(self):
  244. c = self.Consumer()
  245. c.app.connection = _amqp_connection()
  246. g = Gossip(c)
  247. g.start(c)
  248. with patch('celery.worker.consumer.signature') as signature:
  249. sig = signature.return_value = Mock()
  250. task = Mock()
  251. g.call_task(task)
  252. signature.assert_called_with(task, app=c.app)
  253. sig.apply_async.assert_called_with()
  254. sig.apply_async.side_effect = MemoryError()
  255. with patch('celery.worker.consumer.error') as error:
  256. g.call_task(task)
  257. self.assertTrue(error.called)
  258. def Event(self, id='id', clock=312,
  259. hostname='foo@example.com', pid=4312,
  260. topic='topic', action='action', cver=1):
  261. return {
  262. 'id': id,
  263. 'clock': clock,
  264. 'hostname': hostname,
  265. 'pid': pid,
  266. 'topic': topic,
  267. 'action': action,
  268. 'cver': cver,
  269. }
  270. def test_on_elect(self):
  271. c = self.Consumer()
  272. c.app.connection = _amqp_connection()
  273. g = Gossip(c)
  274. g.start(c)
  275. event = self.Event('id1')
  276. g.on_elect(event)
  277. in_heap = g.consensus_requests['id1']
  278. self.assertTrue(in_heap)
  279. g.dispatcher.send.assert_called_with('worker-elect-ack', id='id1')
  280. event.pop('clock')
  281. with patch('celery.worker.consumer.error') as error:
  282. g.on_elect(event)
  283. self.assertTrue(error.called)
  284. def Consumer(self, hostname='foo@x.com', pid=4312):
  285. c = Mock()
  286. c.app.connection = _amqp_connection()
  287. c.hostname = hostname
  288. c.pid = pid
  289. return c
  290. def setup_election(self, g, c):
  291. g.start(c)
  292. g.clock = self.app.clock
  293. self.assertNotIn('idx', g.consensus_replies)
  294. self.assertIsNone(g.on_elect_ack({'id': 'idx'}))
  295. g.state.alive_workers.return_value = [
  296. 'foo@x.com', 'bar@x.com', 'baz@x.com',
  297. ]
  298. g.consensus_replies['id1'] = []
  299. g.consensus_requests['id1'] = []
  300. e1 = self.Event('id1', 1, 'foo@x.com')
  301. e2 = self.Event('id1', 2, 'bar@x.com')
  302. e3 = self.Event('id1', 3, 'baz@x.com')
  303. g.on_elect(e1)
  304. g.on_elect(e2)
  305. g.on_elect(e3)
  306. self.assertEqual(len(g.consensus_requests['id1']), 3)
  307. with patch('celery.worker.consumer.info'):
  308. g.on_elect_ack(e1)
  309. self.assertEqual(len(g.consensus_replies['id1']), 1)
  310. g.on_elect_ack(e2)
  311. self.assertEqual(len(g.consensus_replies['id1']), 2)
  312. g.on_elect_ack(e3)
  313. with self.assertRaises(KeyError):
  314. g.consensus_replies['id1']
  315. def test_on_elect_ack_win(self):
  316. c = self.Consumer(hostname='foo@x.com') # I will win
  317. g = Gossip(c)
  318. handler = g.election_handlers['topic'] = Mock()
  319. self.setup_election(g, c)
  320. handler.assert_called_with('action')
  321. def test_on_elect_ack_lose(self):
  322. c = self.Consumer(hostname='bar@x.com') # I will lose
  323. c.app.connection = _amqp_connection()
  324. g = Gossip(c)
  325. handler = g.election_handlers['topic'] = Mock()
  326. self.setup_election(g, c)
  327. self.assertFalse(handler.called)
  328. def test_on_elect_ack_win_but_no_action(self):
  329. c = self.Consumer(hostname='foo@x.com') # I will win
  330. g = Gossip(c)
  331. g.election_handlers = {}
  332. with patch('celery.worker.consumer.error') as error:
  333. self.setup_election(g, c)
  334. self.assertTrue(error.called)
  335. def test_on_node_join(self):
  336. c = self.Consumer()
  337. g = Gossip(c)
  338. with patch('celery.worker.consumer.debug') as debug:
  339. g.on_node_join(c)
  340. debug.assert_called_with('%s joined the party', 'foo@x.com')
  341. def test_on_node_leave(self):
  342. c = self.Consumer()
  343. g = Gossip(c)
  344. with patch('celery.worker.consumer.debug') as debug:
  345. g.on_node_leave(c)
  346. debug.assert_called_with('%s left', 'foo@x.com')
  347. def test_on_node_lost(self):
  348. c = self.Consumer()
  349. g = Gossip(c)
  350. with patch('celery.worker.consumer.info') as info:
  351. g.on_node_lost(c)
  352. info.assert_called_with('missed heartbeat from %s', 'foo@x.com')
  353. def test_register_timer(self):
  354. c = self.Consumer()
  355. g = Gossip(c)
  356. g.register_timer()
  357. c.timer.call_repeatedly.assert_called_with(g.interval, g.periodic)
  358. tref = g._tref
  359. g.register_timer()
  360. tref.cancel.assert_called_with()
  361. def test_periodic(self):
  362. c = self.Consumer()
  363. g = Gossip(c)
  364. g.on_node_lost = Mock()
  365. state = g.state = Mock()
  366. worker = Mock()
  367. state.workers = {'foo': worker}
  368. worker.alive = True
  369. worker.hostname = 'foo'
  370. g.periodic()
  371. worker.alive = False
  372. g.periodic()
  373. g.on_node_lost.assert_called_with(worker)
  374. with self.assertRaises(KeyError):
  375. state.workers['foo']
  376. def test_on_message(self):
  377. c = self.Consumer()
  378. g = Gossip(c)
  379. self.assertTrue(g.enabled)
  380. prepare = Mock()
  381. prepare.return_value = 'worker-online', {}
  382. c.app.events.State.assert_called_with(
  383. on_node_join=g.on_node_join,
  384. on_node_leave=g.on_node_leave,
  385. max_tasks_in_memory=1,
  386. )
  387. g.update_state = Mock()
  388. worker = Mock()
  389. g.on_node_join = Mock()
  390. g.on_node_leave = Mock()
  391. g.update_state.return_value = worker, 1
  392. message = Mock()
  393. message.delivery_info = {'routing_key': 'worker-online'}
  394. message.headers = {'hostname': 'other'}
  395. handler = g.event_handlers['worker-online'] = Mock()
  396. g.on_message(prepare, message)
  397. handler.assert_called_with(message.payload)
  398. g.event_handlers = {}
  399. g.on_message(prepare, message)
  400. message.delivery_info = {'routing_key': 'worker-offline'}
  401. prepare.return_value = 'worker-offline', {}
  402. g.on_message(prepare, message)
  403. message.delivery_info = {'routing_key': 'worker-baz'}
  404. prepare.return_value = 'worker-baz', {}
  405. g.update_state.return_value = worker, 0
  406. g.on_message(prepare, message)
  407. message.headers = {'hostname': g.hostname}
  408. g.on_message(prepare, message)
  409. g.clock.forward.assert_called_with()