test_consumer.py 19 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import errno
  3. import socket
  4. from collections import deque
  5. import pytest
  6. from billiard.exceptions import RestartFreqExceeded
  7. from case import ContextMock, Mock, call, patch, skip
  8. from celery.utils.collections import LimitedSet
  9. from celery.worker.consumer.agent import Agent
  10. from celery.worker.consumer.consumer import (CLOSE, TERMINATE, Consumer,
  11. dump_body)
  12. from celery.worker.consumer.gossip import Gossip
  13. from celery.worker.consumer.heart import Heart
  14. from celery.worker.consumer.mingle import Mingle
  15. from celery.worker.consumer.tasks import Tasks
  16. class test_Consumer:
  17. def get_consumer(self, no_hub=False, **kwargs):
  18. consumer = Consumer(
  19. on_task_request=Mock(),
  20. init_callback=Mock(),
  21. pool=Mock(),
  22. app=self.app,
  23. timer=Mock(),
  24. controller=Mock(),
  25. hub=None if no_hub else Mock(),
  26. **kwargs
  27. )
  28. consumer.blueprint = Mock(name='blueprint')
  29. consumer._restart_state = Mock(name='_restart_state')
  30. consumer.connection = _amqp_connection()
  31. consumer.connection_errors = (socket.error, OSError,)
  32. consumer.conninfo = consumer.connection
  33. return consumer
  34. def test_repr(self):
  35. assert repr(self.get_consumer())
  36. def test_taskbuckets_defaultdict(self):
  37. c = self.get_consumer()
  38. assert c.task_buckets['fooxasdwx.wewe'] is None
  39. @skip.if_python3(reason='buffer type not available')
  40. def test_dump_body_buffer(self):
  41. msg = Mock()
  42. msg.body = 'str'
  43. assert dump_body(msg, buffer(msg.body)) # noqa: F821
  44. def test_sets_heartbeat(self):
  45. c = self.get_consumer(amqheartbeat=10)
  46. assert c.amqheartbeat == 10
  47. self.app.conf.broker_heartbeat = 20
  48. c = self.get_consumer(amqheartbeat=None)
  49. assert c.amqheartbeat == 20
  50. def test_gevent_bug_disables_connection_timeout(self):
  51. with patch('celery.worker.consumer.consumer._detect_environment') as d:
  52. d.return_value = 'gevent'
  53. self.app.conf.broker_connection_timeout = 33.33
  54. self.get_consumer()
  55. assert self.app.conf.broker_connection_timeout is None
  56. def test_limit_moved_to_pool(self):
  57. with patch('celery.worker.consumer.consumer.task_reserved') as reserv:
  58. c = self.get_consumer()
  59. c.on_task_request = Mock(name='on_task_request')
  60. request = Mock(name='request')
  61. c._limit_move_to_pool(request)
  62. reserv.assert_called_with(request)
  63. c.on_task_request.assert_called_with(request)
  64. def test_update_prefetch_count(self):
  65. c = self.get_consumer()
  66. c._update_qos_eventually = Mock(name='update_qos')
  67. c.initial_prefetch_count = None
  68. c.pool.num_processes = None
  69. c.prefetch_multiplier = 10
  70. assert c._update_prefetch_count(1) is None
  71. c.initial_prefetch_count = 10
  72. c.pool.num_processes = 10
  73. c._update_prefetch_count(8)
  74. c._update_qos_eventually.assert_called_with(8)
  75. assert c.initial_prefetch_count == 10 * 10
  76. def test_flush_events(self):
  77. c = self.get_consumer()
  78. c.event_dispatcher = None
  79. c._flush_events()
  80. c.event_dispatcher = Mock(name='evd')
  81. c._flush_events()
  82. c.event_dispatcher.flush.assert_called_with()
  83. def test_on_send_event_buffered(self):
  84. c = self.get_consumer()
  85. c.hub = None
  86. c.on_send_event_buffered()
  87. c.hub = Mock(name='hub')
  88. c.on_send_event_buffered()
  89. c.hub._ready.add.assert_called_with(c._flush_events)
  90. def test_limit_task(self):
  91. c = self.get_consumer()
  92. c.timer = Mock()
  93. bucket = Mock()
  94. request = Mock()
  95. bucket.can_consume.return_value = True
  96. bucket.contents = deque()
  97. c._limit_task(request, bucket, 3)
  98. bucket.can_consume.assert_called_with(3)
  99. bucket.expected_time.assert_called_with(3)
  100. c.timer.call_after.assert_called_with(
  101. bucket.expected_time(), c._on_bucket_wakeup, (bucket, 3),
  102. priority=c._limit_order,
  103. )
  104. bucket.can_consume.return_value = False
  105. bucket.expected_time.return_value = 3.33
  106. limit_order = c._limit_order
  107. c._limit_task(request, bucket, 4)
  108. assert c._limit_order == limit_order + 1
  109. bucket.can_consume.assert_called_with(4)
  110. c.timer.call_after.assert_called_with(
  111. 3.33, c._on_bucket_wakeup, (bucket, 4),
  112. priority=c._limit_order,
  113. )
  114. bucket.expected_time.assert_called_with(4)
  115. def test_start_blueprint_raises_EMFILE(self):
  116. c = self.get_consumer()
  117. exc = c.blueprint.start.side_effect = OSError()
  118. exc.errno = errno.EMFILE
  119. with pytest.raises(OSError):
  120. c.start()
  121. def test_max_restarts_exceeded(self):
  122. c = self.get_consumer()
  123. def se(*args, **kwargs):
  124. c.blueprint.state = CLOSE
  125. raise RestartFreqExceeded()
  126. c._restart_state.step.side_effect = se
  127. c.blueprint.start.side_effect = socket.error()
  128. with patch('celery.worker.consumer.consumer.sleep') as sleep:
  129. c.start()
  130. sleep.assert_called_with(1)
  131. def test_do_not_restart_when_closed(self):
  132. c = self.get_consumer()
  133. c.blueprint.state = None
  134. def bp_start(*args, **kwargs):
  135. c.blueprint.state = CLOSE
  136. c.blueprint.start.side_effect = bp_start
  137. with patch('celery.worker.consumer.consumer.sleep'):
  138. c.start()
  139. c.blueprint.start.assert_called_once_with(c)
  140. def test_do_not_restart_when_terminated(self):
  141. c = self.get_consumer()
  142. c.blueprint.state = None
  143. def bp_start(*args, **kwargs):
  144. c.blueprint.state = TERMINATE
  145. c.blueprint.start.side_effect = bp_start
  146. with patch('celery.worker.consumer.consumer.sleep'):
  147. c.start()
  148. c.blueprint.start.assert_called_once_with(c)
  149. def test_no_retry_raises_error(self):
  150. self.app.conf.broker_connection_retry = False
  151. c = self.get_consumer()
  152. c.blueprint.start.side_effect = socket.error()
  153. with pytest.raises(socket.error):
  154. c.start()
  155. def _closer(self, c):
  156. def se(*args, **kwargs):
  157. c.blueprint.state = CLOSE
  158. return se
  159. def test_collects_at_restart(self):
  160. c = self.get_consumer()
  161. c.connection.collect.side_effect = MemoryError()
  162. c.blueprint.start.side_effect = socket.error()
  163. c.blueprint.restart.side_effect = self._closer(c)
  164. c.start()
  165. c.connection.collect.assert_called_with()
  166. def test_register_with_event_loop(self):
  167. c = self.get_consumer()
  168. c.register_with_event_loop(Mock(name='loop'))
  169. def test_on_close_clears_semaphore_timer_and_reqs(self):
  170. with patch('celery.worker.consumer.consumer.reserved_requests') as res:
  171. c = self.get_consumer()
  172. c.on_close()
  173. c.controller.semaphore.clear.assert_called_with()
  174. c.timer.clear.assert_called_with()
  175. res.clear.assert_called_with()
  176. c.pool.flush.assert_called_with()
  177. c.controller = None
  178. c.timer = None
  179. c.pool = None
  180. c.on_close()
  181. def test_connect_error_handler(self):
  182. self.app._connection = _amqp_connection()
  183. conn = self.app._connection.return_value
  184. c = self.get_consumer()
  185. assert c.connect()
  186. conn.ensure_connection.assert_called()
  187. errback = conn.ensure_connection.call_args[0][0]
  188. errback(Mock(), 0)
  189. class test_Heart:
  190. def test_start(self):
  191. c = Mock()
  192. c.timer = Mock()
  193. c.event_dispatcher = Mock()
  194. with patch('celery.worker.heartbeat.Heart') as hcls:
  195. h = Heart(c)
  196. assert h.enabled
  197. assert h.heartbeat_interval is None
  198. assert c.heart is None
  199. h.start(c)
  200. assert c.heart
  201. hcls.assert_called_with(c.timer, c.event_dispatcher,
  202. h.heartbeat_interval)
  203. c.heart.start.assert_called_with()
  204. def test_start_heartbeat_interval(self):
  205. c = Mock()
  206. c.timer = Mock()
  207. c.event_dispatcher = Mock()
  208. with patch('celery.worker.heartbeat.Heart') as hcls:
  209. h = Heart(c, False, 20)
  210. assert h.enabled
  211. assert h.heartbeat_interval == 20
  212. assert c.heart is None
  213. h.start(c)
  214. assert c.heart
  215. hcls.assert_called_with(c.timer, c.event_dispatcher,
  216. h.heartbeat_interval)
  217. c.heart.start.assert_called_with()
  218. class test_Tasks:
  219. def test_stop(self):
  220. c = Mock()
  221. tasks = Tasks(c)
  222. assert c.task_consumer is None
  223. assert c.qos is None
  224. c.task_consumer = Mock()
  225. tasks.stop(c)
  226. def test_stop_already_stopped(self):
  227. c = Mock()
  228. tasks = Tasks(c)
  229. tasks.stop(c)
  230. class test_Agent:
  231. def test_start(self):
  232. c = Mock()
  233. agent = Agent(c)
  234. agent.instantiate = Mock()
  235. agent.agent_cls = 'foo:Agent'
  236. assert agent.create(c) is not None
  237. agent.instantiate.assert_called_with(agent.agent_cls, c.connection)
  238. class test_Mingle:
  239. def test_start_no_replies(self):
  240. c = Mock()
  241. c.app.connection_for_read = _amqp_connection()
  242. mingle = Mingle(c)
  243. I = c.app.control.inspect.return_value = Mock()
  244. I.hello.return_value = {}
  245. mingle.start(c)
  246. def test_start(self):
  247. c = Mock()
  248. c.app.connection_for_read = _amqp_connection()
  249. mingle = Mingle(c)
  250. assert mingle.enabled
  251. Aig = LimitedSet()
  252. Big = LimitedSet()
  253. Aig.add('Aig-1')
  254. Aig.add('Aig-2')
  255. Big.add('Big-1')
  256. I = c.app.control.inspect.return_value = Mock()
  257. I.hello.return_value = {
  258. 'A@example.com': {
  259. 'clock': 312,
  260. 'revoked': Aig._data,
  261. },
  262. 'B@example.com': {
  263. 'clock': 29,
  264. 'revoked': Big._data,
  265. },
  266. 'C@example.com': {
  267. 'error': 'unknown method',
  268. },
  269. }
  270. our_revoked = c.controller.state.revoked = LimitedSet()
  271. mingle.start(c)
  272. I.hello.assert_called_with(c.hostname, our_revoked._data)
  273. c.app.clock.adjust.assert_has_calls([
  274. call(312), call(29),
  275. ], any_order=True)
  276. assert 'Aig-1' in our_revoked
  277. assert 'Aig-2' in our_revoked
  278. assert 'Big-1' in our_revoked
  279. def _amqp_connection():
  280. connection = ContextMock(name='Connection')
  281. connection.return_value = ContextMock(name='connection')
  282. connection.return_value.transport.driver_type = 'amqp'
  283. return connection
  284. class test_Gossip:
  285. def test_init(self):
  286. c = self.Consumer()
  287. c.app.connection_for_read = _amqp_connection()
  288. g = Gossip(c)
  289. assert g.enabled
  290. assert c.gossip is g
  291. def test_callbacks(self):
  292. c = self.Consumer()
  293. c.app.connection_for_read = _amqp_connection()
  294. g = Gossip(c)
  295. on_node_join = Mock(name='on_node_join')
  296. on_node_join2 = Mock(name='on_node_join2')
  297. on_node_leave = Mock(name='on_node_leave')
  298. on_node_lost = Mock(name='on.node_lost')
  299. g.on.node_join.add(on_node_join)
  300. g.on.node_join.add(on_node_join2)
  301. g.on.node_leave.add(on_node_leave)
  302. g.on.node_lost.add(on_node_lost)
  303. worker = Mock(name='worker')
  304. g.on_node_join(worker)
  305. on_node_join.assert_called_with(worker)
  306. on_node_join2.assert_called_with(worker)
  307. g.on_node_leave(worker)
  308. on_node_leave.assert_called_with(worker)
  309. g.on_node_lost(worker)
  310. on_node_lost.assert_called_with(worker)
  311. def test_election(self):
  312. c = self.Consumer()
  313. c.app.connection_for_read = _amqp_connection()
  314. g = Gossip(c)
  315. g.start(c)
  316. g.election('id', 'topic', 'action')
  317. assert g.consensus_replies['id'] == []
  318. g.dispatcher.send.assert_called_with(
  319. 'worker-elect', id='id', topic='topic', cver=1, action='action',
  320. )
  321. def test_call_task(self):
  322. c = self.Consumer()
  323. c.app.connection_for_read = _amqp_connection()
  324. g = Gossip(c)
  325. g.start(c)
  326. signature = g.app.signature = Mock(name='app.signature')
  327. task = Mock()
  328. g.call_task(task)
  329. signature.assert_called_with(task)
  330. signature.return_value.apply_async.assert_called_with()
  331. signature.return_value.apply_async.side_effect = MemoryError()
  332. with patch('celery.worker.consumer.gossip.logger') as logger:
  333. g.call_task(task)
  334. logger.exception.assert_called()
  335. def Event(self, id='id', clock=312,
  336. hostname='foo@example.com', pid=4312,
  337. topic='topic', action='action', cver=1):
  338. return {
  339. 'id': id,
  340. 'clock': clock,
  341. 'hostname': hostname,
  342. 'pid': pid,
  343. 'topic': topic,
  344. 'action': action,
  345. 'cver': cver,
  346. }
  347. def test_on_elect(self):
  348. c = self.Consumer()
  349. c.app.connection_for_read = _amqp_connection()
  350. g = Gossip(c)
  351. g.start(c)
  352. event = self.Event('id1')
  353. g.on_elect(event)
  354. in_heap = g.consensus_requests['id1']
  355. assert in_heap
  356. g.dispatcher.send.assert_called_with('worker-elect-ack', id='id1')
  357. event.pop('clock')
  358. with patch('celery.worker.consumer.gossip.logger') as logger:
  359. g.on_elect(event)
  360. logger.exception.assert_called()
  361. def Consumer(self, hostname='foo@x.com', pid=4312):
  362. c = Mock()
  363. c.app.connection = _amqp_connection()
  364. c.hostname = hostname
  365. c.pid = pid
  366. return c
  367. def setup_election(self, g, c):
  368. g.start(c)
  369. g.clock = self.app.clock
  370. assert 'idx' not in g.consensus_replies
  371. assert g.on_elect_ack({'id': 'idx'}) is None
  372. g.state.alive_workers.return_value = [
  373. 'foo@x.com', 'bar@x.com', 'baz@x.com',
  374. ]
  375. g.consensus_replies['id1'] = []
  376. g.consensus_requests['id1'] = []
  377. e1 = self.Event('id1', 1, 'foo@x.com')
  378. e2 = self.Event('id1', 2, 'bar@x.com')
  379. e3 = self.Event('id1', 3, 'baz@x.com')
  380. g.on_elect(e1)
  381. g.on_elect(e2)
  382. g.on_elect(e3)
  383. assert len(g.consensus_requests['id1']) == 3
  384. with patch('celery.worker.consumer.gossip.info'):
  385. g.on_elect_ack(e1)
  386. assert len(g.consensus_replies['id1']) == 1
  387. g.on_elect_ack(e2)
  388. assert len(g.consensus_replies['id1']) == 2
  389. g.on_elect_ack(e3)
  390. with pytest.raises(KeyError):
  391. g.consensus_replies['id1']
  392. def test_on_elect_ack_win(self):
  393. c = self.Consumer(hostname='foo@x.com') # I will win
  394. c.app.connection_for_read = _amqp_connection()
  395. g = Gossip(c)
  396. handler = g.election_handlers['topic'] = Mock()
  397. self.setup_election(g, c)
  398. handler.assert_called_with('action')
  399. def test_on_elect_ack_lose(self):
  400. c = self.Consumer(hostname='bar@x.com') # I will lose
  401. c.app.connection_for_read = _amqp_connection()
  402. g = Gossip(c)
  403. handler = g.election_handlers['topic'] = Mock()
  404. self.setup_election(g, c)
  405. handler.assert_not_called()
  406. def test_on_elect_ack_win_but_no_action(self):
  407. c = self.Consumer(hostname='foo@x.com') # I will win
  408. c.app.connection_for_read = _amqp_connection()
  409. g = Gossip(c)
  410. g.election_handlers = {}
  411. with patch('celery.worker.consumer.gossip.logger') as logger:
  412. self.setup_election(g, c)
  413. logger.exception.assert_called()
  414. def test_on_node_join(self):
  415. c = self.Consumer()
  416. c.app.connection_for_read = _amqp_connection()
  417. g = Gossip(c)
  418. with patch('celery.worker.consumer.gossip.debug') as debug:
  419. g.on_node_join(c)
  420. debug.assert_called_with('%s joined the party', 'foo@x.com')
  421. def test_on_node_leave(self):
  422. c = self.Consumer()
  423. c.app.connection_for_read = _amqp_connection()
  424. g = Gossip(c)
  425. with patch('celery.worker.consumer.gossip.debug') as debug:
  426. g.on_node_leave(c)
  427. debug.assert_called_with('%s left', 'foo@x.com')
  428. def test_on_node_lost(self):
  429. c = self.Consumer()
  430. c.app.connection_for_read = _amqp_connection()
  431. g = Gossip(c)
  432. with patch('celery.worker.consumer.gossip.info') as info:
  433. g.on_node_lost(c)
  434. info.assert_called_with('missed heartbeat from %s', 'foo@x.com')
  435. def test_register_timer(self):
  436. c = self.Consumer()
  437. c.app.connection_for_read = _amqp_connection()
  438. g = Gossip(c)
  439. g.register_timer()
  440. c.timer.call_repeatedly.assert_called_with(g.interval, g.periodic)
  441. tref = g._tref
  442. g.register_timer()
  443. tref.cancel.assert_called_with()
  444. def test_periodic(self):
  445. c = self.Consumer()
  446. c.app.connection_for_read = _amqp_connection()
  447. g = Gossip(c)
  448. g.on_node_lost = Mock()
  449. state = g.state = Mock()
  450. worker = Mock()
  451. state.workers = {'foo': worker}
  452. worker.alive = True
  453. worker.hostname = 'foo'
  454. g.periodic()
  455. worker.alive = False
  456. g.periodic()
  457. g.on_node_lost.assert_called_with(worker)
  458. with pytest.raises(KeyError):
  459. state.workers['foo']
  460. def test_on_message__task(self):
  461. c = self.Consumer()
  462. c.app.connection_for_read = _amqp_connection()
  463. g = Gossip(c)
  464. assert g.enabled
  465. message = Mock(name='message')
  466. message.delivery_info = {'routing_key': 'task.failed'}
  467. g.on_message(Mock(name='prepare'), message)
  468. def test_on_message(self):
  469. c = self.Consumer()
  470. c.app.connection_for_read = _amqp_connection()
  471. g = Gossip(c)
  472. assert g.enabled
  473. prepare = Mock()
  474. prepare.return_value = 'worker-online', {}
  475. c.app.events.State.assert_called_with(
  476. on_node_join=g.on_node_join,
  477. on_node_leave=g.on_node_leave,
  478. max_tasks_in_memory=1,
  479. )
  480. g.update_state = Mock()
  481. worker = Mock()
  482. g.on_node_join = Mock()
  483. g.on_node_leave = Mock()
  484. g.update_state.return_value = worker, 1
  485. message = Mock()
  486. message.delivery_info = {'routing_key': 'worker-online'}
  487. message.headers = {'hostname': 'other'}
  488. handler = g.event_handlers['worker-online'] = Mock()
  489. g.on_message(prepare, message)
  490. handler.assert_called_with(message.payload)
  491. g.event_handlers = {}
  492. g.on_message(prepare, message)
  493. message.delivery_info = {'routing_key': 'worker-offline'}
  494. prepare.return_value = 'worker-offline', {}
  495. g.on_message(prepare, message)
  496. message.delivery_info = {'routing_key': 'worker-baz'}
  497. prepare.return_value = 'worker-baz', {}
  498. g.update_state.return_value = worker, 0
  499. g.on_message(prepare, message)
  500. message.headers = {'hostname': g.hostname}
  501. g.on_message(prepare, message)
  502. g.clock.forward.assert_called_with()