test_worker.py 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import os
  3. import pytest
  4. import socket
  5. import sys
  6. from collections import deque
  7. from datetime import datetime, timedelta
  8. from functools import partial
  9. from threading import Event
  10. from amqp import ChannelError
  11. from case import Mock, patch, skip
  12. from kombu import Connection
  13. from kombu.common import QoS, ignore_errors
  14. from kombu.transport.base import Message
  15. from kombu.transport.memory import Transport
  16. from kombu.utils.uuid import uuid
  17. from celery.bootsteps import RUN, CLOSE, TERMINATE, StartStopStep
  18. from celery.concurrency.base import BasePool
  19. from celery.exceptions import (
  20. WorkerShutdown, WorkerTerminate, TaskRevokedError,
  21. InvalidTaskError, ImproperlyConfigured,
  22. )
  23. from celery.five import Empty, range, Queue as FastQueue
  24. from celery.platforms import EX_FAILURE
  25. from celery.worker import worker as worker_module
  26. from celery.worker import components
  27. from celery.worker import consumer
  28. from celery.worker import state
  29. from celery.worker.consumer import Consumer
  30. from celery.worker.pidbox import gPidbox
  31. from celery.worker.request import Request
  32. from celery.utils.nodenames import worker_direct
  33. from celery.utils.serialization import pickle
  34. from celery.utils.timer2 import Timer
  35. def MockStep(step=None):
  36. if step is None:
  37. step = Mock(name='step')
  38. else:
  39. step.blueprint = Mock(name='step.blueprint')
  40. step.blueprint.name = 'MockNS'
  41. step.name = 'MockStep(%s)' % (id(step),)
  42. return step
  43. def mock_event_dispatcher():
  44. evd = Mock(name='event_dispatcher')
  45. evd.groups = ['worker']
  46. evd._outbound_buffer = deque()
  47. return evd
  48. def find_step(obj, typ):
  49. return obj.blueprint.steps[typ.name]
  50. def create_message(channel, **data):
  51. data.setdefault('id', uuid())
  52. m = Message(body=pickle.dumps(dict(**data)),
  53. channel=channel,
  54. content_type='application/x-python-serialize',
  55. content_encoding='binary',
  56. delivery_info={'consumer_tag': 'mock'})
  57. m.accept = ['application/x-python-serialize']
  58. return m
  59. class ConsumerCase:
  60. def create_task_message(self, channel, *args, **kwargs):
  61. m = self.TaskMessage(*args, **kwargs)
  62. m.channel = channel
  63. m.delivery_info = {'consumer_tag': 'mock'}
  64. return m
  65. class test_Consumer(ConsumerCase):
  66. def setup(self):
  67. self.buffer = FastQueue()
  68. self.timer = Timer()
  69. @self.app.task(shared=False)
  70. def foo_task(x, y, z):
  71. return x * y * z
  72. self.foo_task = foo_task
  73. def teardown(self):
  74. self.timer.stop()
  75. def LoopConsumer(self, buffer=None, controller=None, timer=None, app=None,
  76. without_mingle=True, without_gossip=True,
  77. without_heartbeat=True, **kwargs):
  78. if controller is None:
  79. controller = Mock(name='.controller')
  80. buffer = buffer if buffer is not None else self.buffer.put
  81. timer = timer if timer is not None else self.timer
  82. app = app if app is not None else self.app
  83. c = Consumer(
  84. buffer,
  85. timer=timer,
  86. app=app,
  87. controller=controller,
  88. without_mingle=without_mingle,
  89. without_gossip=without_gossip,
  90. without_heartbeat=without_heartbeat,
  91. **kwargs
  92. )
  93. c.task_consumer = Mock(name='.task_consumer')
  94. c.qos = QoS(c.task_consumer.qos, 10)
  95. c.connection = Mock(name='.connection')
  96. c.controller = c.app.WorkController()
  97. c.heart = Mock(name='.heart')
  98. c.controller.consumer = c
  99. c.pool = c.controller.pool = Mock(name='.controller.pool')
  100. c.node = Mock(name='.node')
  101. c.event_dispatcher = mock_event_dispatcher()
  102. return c
  103. def NoopConsumer(self, *args, **kwargs):
  104. c = self.LoopConsumer(*args, **kwargs)
  105. c.loop = Mock(name='.loop')
  106. return c
  107. def test_info(self):
  108. c = self.NoopConsumer()
  109. c.connection.info.return_value = {'foo': 'bar'}
  110. c.controller.pool.info.return_value = [Mock(), Mock()]
  111. info = c.controller.stats()
  112. assert info['prefetch_count'] == 10
  113. assert info['broker']
  114. def test_start_when_closed(self):
  115. c = self.NoopConsumer()
  116. c.blueprint.state = CLOSE
  117. c.start()
  118. def test_connection(self):
  119. c = self.NoopConsumer()
  120. c.blueprint.start(c)
  121. assert isinstance(c.connection, Connection)
  122. c.blueprint.state = RUN
  123. c.event_dispatcher = None
  124. c.blueprint.restart(c)
  125. assert c.connection
  126. c.blueprint.state = RUN
  127. c.shutdown()
  128. assert c.connection is None
  129. assert c.task_consumer is None
  130. c.blueprint.start(c)
  131. assert isinstance(c.connection, Connection)
  132. c.blueprint.restart(c)
  133. c.stop()
  134. c.shutdown()
  135. assert c.connection is None
  136. assert c.task_consumer is None
  137. def test_close_connection(self):
  138. c = self.NoopConsumer()
  139. c.blueprint.state = RUN
  140. step = find_step(c, consumer.Connection)
  141. connection = c.connection
  142. step.shutdown(c)
  143. connection.close.assert_called()
  144. assert c.connection is None
  145. def test_close_connection__heart_shutdown(self):
  146. c = self.NoopConsumer()
  147. event_dispatcher = c.event_dispatcher
  148. heart = c.heart
  149. c.event_dispatcher.enabled = True
  150. c.blueprint.state = RUN
  151. Events = find_step(c, consumer.Events)
  152. Events.shutdown(c)
  153. Heart = find_step(c, consumer.Heart)
  154. Heart.shutdown(c)
  155. event_dispatcher.close.assert_called()
  156. heart.stop.assert_called_with()
  157. @patch('celery.worker.consumer.consumer.warn')
  158. def test_receive_message_unknown(self, warn):
  159. c = self.LoopConsumer()
  160. c.blueprint.state = RUN
  161. c.steps.pop()
  162. channel = Mock(name='.channeol')
  163. m = create_message(channel, unknown={'baz': '!!!'})
  164. callback = self._get_on_message(c)
  165. callback(m)
  166. warn.assert_called()
  167. @patch('celery.worker.strategy.to_timestamp')
  168. def test_receive_message_eta_OverflowError(self, to_timestamp):
  169. to_timestamp.side_effect = OverflowError()
  170. c = self.LoopConsumer()
  171. c.blueprint.state = RUN
  172. c.steps.pop()
  173. m = self.create_task_message(
  174. Mock(), self.foo_task.name,
  175. args=('2, 2'), kwargs={},
  176. eta=datetime.now().isoformat(),
  177. )
  178. c.update_strategies()
  179. callback = self._get_on_message(c)
  180. callback(m)
  181. assert m.acknowledged
  182. @patch('celery.worker.consumer.consumer.error')
  183. def test_receive_message_InvalidTaskError(self, error):
  184. c = self.LoopConsumer()
  185. c.blueprint.state = RUN
  186. c.steps.pop()
  187. m = self.create_task_message(
  188. Mock(), self.foo_task.name,
  189. args=(1, 2), kwargs='foobarbaz', id=1)
  190. c.update_strategies()
  191. strat = c.strategies[self.foo_task.name] = Mock(name='strategy')
  192. strat.side_effect = InvalidTaskError()
  193. callback = self._get_on_message(c)
  194. callback(m)
  195. error.assert_called()
  196. assert 'Received invalid task message' in error.call_args[0][0]
  197. @patch('celery.worker.consumer.consumer.crit')
  198. def test_on_decode_error(self, crit):
  199. c = self.LoopConsumer()
  200. class MockMessage(Mock):
  201. content_type = 'application/x-msgpack'
  202. content_encoding = 'binary'
  203. body = 'foobarbaz'
  204. message = MockMessage()
  205. c.on_decode_error(message, KeyError('foo'))
  206. assert message.ack.call_count
  207. assert "Can't decode message body" in crit.call_args[0][0]
  208. def _get_on_message(self, c):
  209. if c.qos is None:
  210. c.qos = Mock()
  211. c.task_consumer = Mock()
  212. c.event_dispatcher = mock_event_dispatcher()
  213. c.connection = Mock(name='.connection')
  214. c.connection.get_heartbeat_interval.return_value = 0
  215. c.connection.drain_events.side_effect = WorkerShutdown()
  216. with pytest.raises(WorkerShutdown):
  217. c.loop(*c.loop_args())
  218. assert c.task_consumer.on_message
  219. return c.task_consumer.on_message
  220. def test_receieve_message(self):
  221. c = self.LoopConsumer()
  222. c.blueprint.state = RUN
  223. m = self.create_task_message(
  224. Mock(), self.foo_task.name,
  225. args=[2, 4, 8], kwargs={},
  226. )
  227. c.update_strategies()
  228. callback = self._get_on_message(c)
  229. callback(m)
  230. in_bucket = self.buffer.get_nowait()
  231. assert isinstance(in_bucket, Request)
  232. assert in_bucket.name == self.foo_task.name
  233. assert in_bucket.execute() == 2 * 4 * 8
  234. assert self.timer.empty()
  235. def test_start_channel_error(self):
  236. c = self.NoopConsumer(task_events=False, pool=BasePool())
  237. c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
  238. c.channel_errors = (KeyError,)
  239. try:
  240. with pytest.raises(KeyError):
  241. c.start()
  242. finally:
  243. c.timer and c.timer.stop()
  244. def test_start_connection_error(self):
  245. c = self.NoopConsumer(task_events=False, pool=BasePool())
  246. c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
  247. c.connection_errors = (KeyError,)
  248. try:
  249. with pytest.raises(SyntaxError):
  250. c.start()
  251. finally:
  252. c.timer and c.timer.stop()
  253. def test_loop_ignores_socket_timeout(self):
  254. class Connection(self.app.connection_for_read().__class__):
  255. obj = None
  256. def drain_events(self, **kwargs):
  257. self.obj.connection = None
  258. raise socket.timeout(10)
  259. c = self.NoopConsumer()
  260. c.connection = Connection(self.app.conf.broker_url)
  261. c.connection.obj = c
  262. c.qos = QoS(c.task_consumer.qos, 10)
  263. c.loop(*c.loop_args())
  264. def test_loop_when_socket_error(self):
  265. class Connection(self.app.connection_for_read().__class__):
  266. obj = None
  267. def drain_events(self, **kwargs):
  268. self.obj.connection = None
  269. raise socket.error('foo')
  270. c = self.LoopConsumer()
  271. c.blueprint.state = RUN
  272. conn = c.connection = Connection(self.app.conf.broker_url)
  273. c.connection.obj = c
  274. c.qos = QoS(c.task_consumer.qos, 10)
  275. with pytest.raises(socket.error):
  276. c.loop(*c.loop_args())
  277. c.blueprint.state = CLOSE
  278. c.connection = conn
  279. c.loop(*c.loop_args())
  280. def test_loop(self):
  281. class Connection(self.app.connection_for_read().__class__):
  282. obj = None
  283. def drain_events(self, **kwargs):
  284. self.obj.connection = None
  285. @property
  286. def supports_heartbeats(self):
  287. return False
  288. c = self.LoopConsumer()
  289. c.blueprint.state = RUN
  290. c.connection = Connection(self.app.conf.broker_url)
  291. c.connection.obj = c
  292. c.connection.get_heartbeat_interval = Mock(return_value=None)
  293. c.qos = QoS(c.task_consumer.qos, 10)
  294. c.loop(*c.loop_args())
  295. c.loop(*c.loop_args())
  296. assert c.task_consumer.consume.call_count
  297. c.task_consumer.qos.assert_called_with(prefetch_count=10)
  298. assert c.qos.value == 10
  299. c.qos.decrement_eventually()
  300. assert c.qos.value == 9
  301. c.qos.update()
  302. assert c.qos.value == 9
  303. c.task_consumer.qos.assert_called_with(prefetch_count=9)
  304. def test_ignore_errors(self):
  305. c = self.NoopConsumer()
  306. c.connection_errors = (AttributeError, KeyError,)
  307. c.channel_errors = (SyntaxError,)
  308. ignore_errors(c, Mock(side_effect=AttributeError('foo')))
  309. ignore_errors(c, Mock(side_effect=KeyError('foo')))
  310. ignore_errors(c, Mock(side_effect=SyntaxError('foo')))
  311. with pytest.raises(IndexError):
  312. ignore_errors(c, Mock(side_effect=IndexError('foo')))
  313. def test_apply_eta_task(self):
  314. c = self.NoopConsumer()
  315. c.qos = QoS(None, 10)
  316. task = Mock(name='task', id='1234213')
  317. qos = c.qos.value
  318. c.apply_eta_task(task)
  319. assert task in state.reserved_requests
  320. assert c.qos.value == qos - 1
  321. assert self.buffer.get_nowait() is task
  322. def test_receieve_message_eta_isoformat(self):
  323. c = self.LoopConsumer()
  324. c.blueprint.state = RUN
  325. c.steps.pop()
  326. m = self.create_task_message(
  327. Mock(), self.foo_task.name,
  328. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  329. args=[2, 4, 8], kwargs={},
  330. )
  331. c.qos = QoS(c.task_consumer.qos, 1)
  332. current_pcount = c.qos.value
  333. c.event_dispatcher.enabled = False
  334. c.update_strategies()
  335. callback = self._get_on_message(c)
  336. callback(m)
  337. c.timer.stop()
  338. c.timer.join(1)
  339. items = [entry[2] for entry in self.timer.queue]
  340. found = 0
  341. for item in items:
  342. if item.args[0].name == self.foo_task.name:
  343. found = True
  344. assert found
  345. assert c.qos.value > current_pcount
  346. c.timer.stop()
  347. def test_pidbox_callback(self):
  348. c = self.NoopConsumer()
  349. con = find_step(c, consumer.Control).box
  350. con.node = Mock()
  351. con.reset = Mock()
  352. con.on_message('foo', 'bar')
  353. con.node.handle_message.assert_called_with('foo', 'bar')
  354. con.node = Mock()
  355. con.node.handle_message.side_effect = KeyError('foo')
  356. con.on_message('foo', 'bar')
  357. con.node.handle_message.assert_called_with('foo', 'bar')
  358. con.node = Mock()
  359. con.node.handle_message.side_effect = ValueError('foo')
  360. con.on_message('foo', 'bar')
  361. con.node.handle_message.assert_called_with('foo', 'bar')
  362. con.reset.assert_called()
  363. def test_revoke(self):
  364. c = self.LoopConsumer()
  365. c.blueprint.state = RUN
  366. c.steps.pop()
  367. channel = Mock(name='channel')
  368. id = uuid()
  369. t = self.create_task_message(
  370. channel, self.foo_task.name,
  371. args=[2, 4, 8], kwargs={}, id=id,
  372. )
  373. state.revoked.add(id)
  374. callback = self._get_on_message(c)
  375. callback(t)
  376. assert self.buffer.empty()
  377. def test_receieve_message_not_registered(self):
  378. c = self.LoopConsumer()
  379. c.blueprint.state = RUN
  380. c.steps.pop()
  381. channel = Mock(name='channel')
  382. m = self.create_task_message(
  383. channel, 'x.X.31x', args=[2, 4, 8], kwargs={},
  384. )
  385. callback = self._get_on_message(c)
  386. assert not callback(m)
  387. with pytest.raises(Empty):
  388. self.buffer.get_nowait()
  389. assert self.timer.empty()
  390. @patch('celery.worker.consumer.consumer.warn')
  391. @patch('celery.worker.consumer.consumer.logger')
  392. def test_receieve_message_ack_raises(self, logger, warn):
  393. c = self.LoopConsumer()
  394. c.blueprint.state = RUN
  395. channel = Mock(name='channel')
  396. m = self.create_task_message(
  397. channel, self.foo_task.name,
  398. args=[2, 4, 8], kwargs={},
  399. )
  400. m.headers = None
  401. c.update_strategies()
  402. c.connection_errors = (socket.error,)
  403. m.reject = Mock()
  404. m.reject.side_effect = socket.error('foo')
  405. callback = self._get_on_message(c)
  406. assert not callback(m)
  407. warn.assert_called()
  408. with pytest.raises(Empty):
  409. self.buffer.get_nowait()
  410. assert self.timer.empty()
  411. m.reject_log_error.assert_called_with(logger, c.connection_errors)
  412. def test_receive_message_eta(self):
  413. if os.environ.get('C_DEBUG_TEST'):
  414. pp = partial(print, file=sys.__stderr__)
  415. else:
  416. def pp(*args, **kwargs):
  417. pass
  418. pp('TEST RECEIVE MESSAGE ETA')
  419. pp('+CREATE MYKOMBUCONSUMER')
  420. c = self.LoopConsumer()
  421. pp('-CREATE MYKOMBUCONSUMER')
  422. c.steps.pop()
  423. channel = Mock(name='channel')
  424. pp('+ CREATE MESSAGE')
  425. m = self.create_task_message(
  426. channel, self.foo_task.name,
  427. args=[2, 4, 8], kwargs={},
  428. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  429. )
  430. pp('- CREATE MESSAGE')
  431. try:
  432. pp('+ BLUEPRINT START 1')
  433. c.blueprint.start(c)
  434. pp('- BLUEPRINT START 1')
  435. p = c.app.conf.broker_connection_retry
  436. c.app.conf.broker_connection_retry = False
  437. pp('+ BLUEPRINT START 2')
  438. c.blueprint.start(c)
  439. pp('- BLUEPRINT START 2')
  440. c.app.conf.broker_connection_retry = p
  441. pp('+ BLUEPRINT RESTART')
  442. c.blueprint.restart(c)
  443. pp('- BLUEPRINT RESTART')
  444. pp('+ GET ON MESSAGE')
  445. callback = self._get_on_message(c)
  446. pp('- GET ON MESSAGE')
  447. pp('+ CALLBACK')
  448. callback(m)
  449. pp('- CALLBACK')
  450. finally:
  451. pp('+ STOP TIMER')
  452. c.timer.stop()
  453. pp('- STOP TIMER')
  454. try:
  455. pp('+ JOIN TIMER')
  456. c.timer.join()
  457. pp('- JOIN TIMER')
  458. except RuntimeError:
  459. pass
  460. in_hold = c.timer.queue[0]
  461. assert len(in_hold) == 3
  462. eta, priority, entry = in_hold
  463. task = entry.args[0]
  464. assert isinstance(task, Request)
  465. assert task.name == self.foo_task.name
  466. assert task.execute() == 2 * 4 * 8
  467. with pytest.raises(Empty):
  468. self.buffer.get_nowait()
  469. def test_reset_pidbox_node(self):
  470. c = self.NoopConsumer()
  471. con = find_step(c, consumer.Control).box
  472. con.node = Mock()
  473. chan = con.node.channel = Mock()
  474. chan.close.side_effect = socket.error('foo')
  475. c.connection_errors = (socket.error,)
  476. con.reset()
  477. chan.close.assert_called_with()
  478. def test_reset_pidbox_node_green(self):
  479. c = self.NoopConsumer(pool=Mock(is_green=True))
  480. con = find_step(c, consumer.Control)
  481. assert isinstance(con.box, gPidbox)
  482. con.start(c)
  483. c.pool.spawn_n.assert_called_with(con.box.loop, c)
  484. def test_green_pidbox_node(self):
  485. pool = Mock()
  486. pool.is_green = True
  487. c = self.NoopConsumer(pool=Mock(is_green=True))
  488. controller = find_step(c, consumer.Control)
  489. class BConsumer(Mock):
  490. def __enter__(self):
  491. self.consume()
  492. return self
  493. def __exit__(self, *exc_info):
  494. self.cancel()
  495. controller.box.node.listen = BConsumer()
  496. connections = []
  497. class Connection(object):
  498. calls = 0
  499. def __init__(self, obj):
  500. connections.append(self)
  501. self.obj = obj
  502. self.default_channel = self.channel()
  503. self.closed = False
  504. def __enter__(self):
  505. return self
  506. def __exit__(self, *exc_info):
  507. self.close()
  508. def channel(self):
  509. return Mock()
  510. def as_uri(self):
  511. return 'dummy://'
  512. def drain_events(self, **kwargs):
  513. if not self.calls:
  514. self.calls += 1
  515. raise socket.timeout()
  516. self.obj.connection = None
  517. controller.box._node_shutdown.set()
  518. def close(self):
  519. self.closed = True
  520. c.connection_for_read = lambda: Connection(obj=c)
  521. controller = find_step(c, consumer.Control)
  522. controller.box.loop(c)
  523. controller.box.node.listen.assert_called()
  524. assert controller.box.consumer
  525. controller.box.consumer.consume.assert_called_with()
  526. assert c.connection is None
  527. assert connections[0].closed
  528. @patch('kombu.connection.Connection._establish_connection')
  529. @patch('kombu.utils.functional.sleep')
  530. def test_connect_errback(self, sleep, connect):
  531. c = self.NoopConsumer()
  532. Transport.connection_errors = (ChannelError,)
  533. connect.on_nth_call_do(ChannelError('error'), n=1)
  534. c.connect()
  535. connect.assert_called_with()
  536. def test_stop_pidbox_node(self):
  537. c = self.NoopConsumer()
  538. cont = find_step(c, consumer.Control)
  539. cont._node_stopped = Event()
  540. cont._node_shutdown = Event()
  541. cont._node_stopped.set()
  542. cont.stop(c)
  543. def test_start__loop(self):
  544. class _QoS(object):
  545. prev = 3
  546. value = 4
  547. def update(self):
  548. self.prev = self.value
  549. init_callback = Mock(name='init_callback')
  550. c = self.NoopConsumer(init_callback=init_callback)
  551. c.qos = _QoS()
  552. c.connection = Connection(self.app.conf.broker_url)
  553. c.connection.get_heartbeat_interval = Mock(return_value=None)
  554. c.iterations = 0
  555. def raises_KeyError(*args, **kwargs):
  556. c.iterations += 1
  557. if c.qos.prev != c.qos.value:
  558. c.qos.update()
  559. if c.iterations >= 2:
  560. raise KeyError('foo')
  561. c.loop = raises_KeyError
  562. with pytest.raises(KeyError):
  563. c.start()
  564. assert c.iterations == 2
  565. assert c.qos.prev == c.qos.value
  566. init_callback.reset_mock()
  567. c = self.NoopConsumer(task_events=False, init_callback=init_callback)
  568. c.qos = _QoS()
  569. c.connection = Connection(self.app.conf.broker_url)
  570. c.connection.get_heartbeat_interval = Mock(return_value=None)
  571. c.loop = Mock(side_effect=socket.error('foo'))
  572. with pytest.raises(socket.error):
  573. c.start()
  574. c.loop.assert_called()
  575. def test_reset_connection_with_no_node(self):
  576. c = self.NoopConsumer()
  577. c.steps.pop()
  578. c.blueprint.start(c)
  579. class test_WorkController(ConsumerCase):
  580. def setup(self):
  581. self.worker = self.create_worker()
  582. self._logger = worker_module.logger
  583. self._comp_logger = components.logger
  584. self.logger = worker_module.logger = Mock()
  585. self.comp_logger = components.logger = Mock()
  586. @self.app.task(shared=False)
  587. def foo_task(x, y, z):
  588. return x * y * z
  589. self.foo_task = foo_task
  590. def teardown(self):
  591. worker_module.logger = self._logger
  592. components.logger = self._comp_logger
  593. def create_worker(self, **kw):
  594. worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
  595. worker.blueprint.shutdown_complete.set()
  596. return worker
  597. def test_on_consumer_ready(self):
  598. self.worker.on_consumer_ready(Mock())
  599. def test_setup_queues_worker_direct(self):
  600. self.app.conf.worker_direct = True
  601. self.app.amqp.__dict__['queues'] = Mock()
  602. self.worker.setup_queues({})
  603. self.app.amqp.queues.select_add.assert_called_with(
  604. worker_direct(self.worker.hostname),
  605. )
  606. def test_setup_queues__missing_queue(self):
  607. self.app.amqp.queues.select = Mock(name='select')
  608. self.app.amqp.queues.deselect = Mock(name='deselect')
  609. self.app.amqp.queues.select.side_effect = KeyError()
  610. self.app.amqp.queues.deselect.side_effect = KeyError()
  611. with pytest.raises(ImproperlyConfigured):
  612. self.worker.setup_queues('x,y', exclude='foo,bar')
  613. self.app.amqp.queues.select = Mock(name='select')
  614. with pytest.raises(ImproperlyConfigured):
  615. self.worker.setup_queues('x,y', exclude='foo,bar')
  616. def test_send_worker_shutdown(self):
  617. with patch('celery.signals.worker_shutdown') as ws:
  618. self.worker._send_worker_shutdown()
  619. ws.send.assert_called_with(sender=self.worker)
  620. @skip.todo('unstable test')
  621. def test_process_shutdown_on_worker_shutdown(self):
  622. from celery.concurrency.prefork import process_destructor
  623. from celery.concurrency.asynpool import Worker
  624. with patch('celery.signals.worker_process_shutdown') as ws:
  625. with patch('os._exit') as _exit:
  626. worker = Worker(None, None, on_exit=process_destructor)
  627. worker._do_exit(22, 3.1415926)
  628. ws.send.assert_called_with(
  629. sender=None, pid=22, exitcode=3.1415926,
  630. )
  631. _exit.assert_called_with(3.1415926)
  632. def test_process_task_revoked_release_semaphore(self):
  633. self.worker._quick_release = Mock()
  634. req = Mock()
  635. req.execute_using_pool.side_effect = TaskRevokedError
  636. self.worker._process_task(req)
  637. self.worker._quick_release.assert_called_with()
  638. delattr(self.worker, '_quick_release')
  639. self.worker._process_task(req)
  640. def test_shutdown_no_blueprint(self):
  641. self.worker.blueprint = None
  642. self.worker._shutdown()
  643. @patch('celery.worker.worker.create_pidlock')
  644. def test_use_pidfile(self, create_pidlock):
  645. create_pidlock.return_value = Mock()
  646. worker = self.create_worker(pidfile='pidfilelockfilepid')
  647. worker.steps = []
  648. worker.start()
  649. create_pidlock.assert_called()
  650. worker.stop()
  651. worker.pidlock.release.assert_called()
  652. def test_attrs(self):
  653. worker = self.worker
  654. assert worker.timer is not None
  655. assert isinstance(worker.timer, Timer)
  656. assert worker.pool is not None
  657. assert worker.consumer is not None
  658. assert worker.steps
  659. def test_with_embedded_beat(self):
  660. worker = self.app.WorkController(concurrency=1, loglevel=0, beat=True)
  661. assert worker.beat
  662. assert worker.beat in [w.obj for w in worker.steps]
  663. def test_with_autoscaler(self):
  664. worker = self.create_worker(
  665. autoscale=[10, 3], send_events=False,
  666. timer_cls='celery.utils.timer2.Timer',
  667. )
  668. assert worker.autoscaler
  669. def test_dont_stop_or_terminate(self):
  670. worker = self.app.WorkController(concurrency=1, loglevel=0)
  671. worker.stop()
  672. assert worker.blueprint.state != CLOSE
  673. worker.terminate()
  674. assert worker.blueprint.state != CLOSE
  675. sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
  676. try:
  677. worker.blueprint.state = RUN
  678. worker.stop(in_sighandler=True)
  679. assert worker.blueprint.state != CLOSE
  680. worker.terminate(in_sighandler=True)
  681. assert worker.blueprint.state != CLOSE
  682. finally:
  683. worker.pool.signal_safe = sigsafe
  684. def test_on_timer_error(self):
  685. worker = self.app.WorkController(concurrency=1, loglevel=0)
  686. try:
  687. raise KeyError('foo')
  688. except KeyError as exc:
  689. components.Timer(worker).on_timer_error(exc)
  690. msg, args = self.comp_logger.error.call_args[0]
  691. assert 'KeyError' in msg % args
  692. def test_on_timer_tick(self):
  693. worker = self.app.WorkController(concurrency=1, loglevel=10)
  694. components.Timer(worker).on_timer_tick(30.0)
  695. xargs = self.comp_logger.debug.call_args[0]
  696. fmt, arg = xargs[0], xargs[1]
  697. assert arg == 30.0
  698. assert 'Next ETA %s secs' in fmt
  699. def test_process_task(self):
  700. worker = self.worker
  701. worker.pool = Mock()
  702. channel = Mock()
  703. m = self.create_task_message(
  704. channel, self.foo_task.name,
  705. args=[4, 8, 10], kwargs={},
  706. )
  707. task = Request(m, app=self.app)
  708. worker._process_task(task)
  709. assert worker.pool.apply_async.call_count == 1
  710. worker.pool.stop()
  711. def test_process_task_raise_base(self):
  712. worker = self.worker
  713. worker.pool = Mock()
  714. worker.pool.apply_async.side_effect = KeyboardInterrupt('Ctrl+C')
  715. channel = Mock()
  716. m = self.create_task_message(
  717. channel, self.foo_task.name,
  718. args=[4, 8, 10], kwargs={},
  719. )
  720. task = Request(m, app=self.app)
  721. worker.steps = []
  722. worker.blueprint.state = RUN
  723. with pytest.raises(KeyboardInterrupt):
  724. worker._process_task(task)
  725. def test_process_task_raise_WorkerTerminate(self):
  726. worker = self.worker
  727. worker.pool = Mock()
  728. worker.pool.apply_async.side_effect = WorkerTerminate()
  729. channel = Mock()
  730. m = self.create_task_message(
  731. channel, self.foo_task.name,
  732. args=[4, 8, 10], kwargs={},
  733. )
  734. task = Request(m, app=self.app)
  735. worker.steps = []
  736. worker.blueprint.state = RUN
  737. with pytest.raises(SystemExit):
  738. worker._process_task(task)
  739. def test_process_task_raise_regular(self):
  740. worker = self.worker
  741. worker.pool = Mock()
  742. worker.pool.apply_async.side_effect = KeyError('some exception')
  743. channel = Mock()
  744. m = self.create_task_message(
  745. channel, self.foo_task.name,
  746. args=[4, 8, 10], kwargs={},
  747. )
  748. task = Request(m, app=self.app)
  749. with pytest.raises(KeyError):
  750. worker._process_task(task)
  751. worker.pool.stop()
  752. def test_start_catches_base_exceptions(self):
  753. worker1 = self.create_worker()
  754. worker1.blueprint.state = RUN
  755. stc = MockStep()
  756. stc.start.side_effect = WorkerTerminate()
  757. worker1.steps = [stc]
  758. worker1.start()
  759. stc.start.assert_called_with(worker1)
  760. assert stc.terminate.call_count
  761. worker2 = self.create_worker()
  762. worker2.blueprint.state = RUN
  763. sec = MockStep()
  764. sec.start.side_effect = WorkerShutdown()
  765. sec.terminate = None
  766. worker2.steps = [sec]
  767. worker2.start()
  768. assert sec.stop.call_count
  769. def test_statedb(self):
  770. from celery.worker import state
  771. Persistent = state.Persistent
  772. state.Persistent = Mock()
  773. try:
  774. worker = self.create_worker(statedb='statefilename')
  775. assert worker._persistence
  776. finally:
  777. state.Persistent = Persistent
  778. def test_process_task_sem(self):
  779. worker = self.worker
  780. worker._quick_acquire = Mock()
  781. req = Mock()
  782. worker._process_task_sem(req)
  783. worker._quick_acquire.assert_called_with(worker._process_task, req)
  784. def test_signal_consumer_close(self):
  785. worker = self.worker
  786. worker.consumer = Mock()
  787. worker.signal_consumer_close()
  788. worker.consumer.close.assert_called_with()
  789. worker.consumer.close.side_effect = AttributeError()
  790. worker.signal_consumer_close()
  791. def test_rusage__no_resource(self):
  792. from celery.worker import worker
  793. prev, worker.resource = worker.resource, None
  794. try:
  795. self.worker.pool = Mock(name='pool')
  796. with pytest.raises(NotImplementedError):
  797. self.worker.rusage()
  798. self.worker.stats()
  799. finally:
  800. worker.resource = prev
  801. def test_repr(self):
  802. assert repr(self.worker)
  803. def test_str(self):
  804. assert str(self.worker) == self.worker.hostname
  805. def test_start__stop(self):
  806. worker = self.worker
  807. worker.blueprint.shutdown_complete.set()
  808. worker.steps = [MockStep(StartStopStep(self)) for _ in range(4)]
  809. worker.blueprint.state = RUN
  810. worker.blueprint.started = 4
  811. for w in worker.steps:
  812. w.start = Mock()
  813. w.close = Mock()
  814. w.stop = Mock()
  815. worker.start()
  816. for w in worker.steps:
  817. w.start.assert_called()
  818. worker.consumer = Mock()
  819. worker.stop(exitcode=3)
  820. for stopstep in worker.steps:
  821. stopstep.close.assert_called()
  822. stopstep.stop.assert_called()
  823. # Doesn't close pool if no pool.
  824. worker.start()
  825. worker.pool = None
  826. worker.stop()
  827. # test that stop of None is not attempted
  828. worker.steps[-1] = None
  829. worker.start()
  830. worker.stop()
  831. def test_start__KeyboardInterrupt(self):
  832. worker = self.worker
  833. worker.blueprint = Mock(name='blueprint')
  834. worker.blueprint.start.side_effect = KeyboardInterrupt()
  835. worker.stop = Mock(name='stop')
  836. worker.start()
  837. worker.stop.assert_called_with(exitcode=EX_FAILURE)
  838. def test_register_with_event_loop(self):
  839. worker = self.worker
  840. hub = Mock(name='hub')
  841. worker.blueprint = Mock(name='blueprint')
  842. worker.register_with_event_loop(hub)
  843. worker.blueprint.send_all.assert_called_with(
  844. worker, 'register_with_event_loop', args=(hub,),
  845. description='hub.register',
  846. )
  847. def test_step_raises(self):
  848. worker = self.worker
  849. step = Mock()
  850. worker.steps = [step]
  851. step.start.side_effect = TypeError()
  852. worker.stop = Mock()
  853. worker.start()
  854. worker.stop.assert_called_with(exitcode=EX_FAILURE)
  855. def test_state(self):
  856. assert self.worker.state
  857. def test_start__terminate(self):
  858. worker = self.worker
  859. worker.blueprint.shutdown_complete.set()
  860. worker.blueprint.started = 5
  861. worker.blueprint.state = RUN
  862. worker.steps = [MockStep() for _ in range(5)]
  863. worker.start()
  864. for w in worker.steps[:3]:
  865. w.start.assert_called()
  866. assert worker.blueprint.started == len(worker.steps)
  867. assert worker.blueprint.state == RUN
  868. worker.terminate()
  869. for step in worker.steps:
  870. step.terminate.assert_called()
  871. worker.blueprint.state = TERMINATE
  872. worker.terminate()
  873. def test_Hub_create(self):
  874. w = Mock()
  875. x = components.Hub(w)
  876. x.create(w)
  877. assert w.timer.max_interval
  878. def test_Pool_create_threaded(self):
  879. w = Mock()
  880. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  881. w.pool_cls = Mock()
  882. w.use_eventloop = False
  883. pool = components.Pool(w)
  884. pool.create(w)
  885. def test_Pool_pool_no_sem(self):
  886. w = Mock()
  887. w.pool_cls.uses_semaphore = False
  888. components.Pool(w).create(w)
  889. assert w.process_task is w._process_task
  890. def test_Pool_create(self):
  891. from kombu.async.semaphore import LaxBoundedSemaphore
  892. w = Mock()
  893. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  894. w.hub = Mock()
  895. PoolImp = Mock()
  896. poolimp = PoolImp.return_value = Mock()
  897. poolimp._pool = [Mock(), Mock()]
  898. poolimp._cache = {}
  899. poolimp._fileno_to_inq = {}
  900. poolimp._fileno_to_outq = {}
  901. from celery.concurrency.prefork import TaskPool as _TaskPool
  902. class MockTaskPool(_TaskPool):
  903. Pool = PoolImp
  904. @property
  905. def timers(self):
  906. return {Mock(): 30}
  907. w.pool_cls = MockTaskPool
  908. w.use_eventloop = True
  909. w.consumer.restart_count = -1
  910. pool = components.Pool(w)
  911. pool.create(w)
  912. pool.register_with_event_loop(w, w.hub)
  913. if sys.platform != 'win32':
  914. assert isinstance(w.semaphore, LaxBoundedSemaphore)
  915. P = w.pool
  916. P.start()