test_worker.py 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import os
  3. import socket
  4. import sys
  5. from collections import deque
  6. from datetime import datetime, timedelta
  7. from functools import partial
  8. from threading import Event
  9. import pytest
  10. from amqp import ChannelError
  11. from case import Mock, patch, skip
  12. from celery.bootsteps import CLOSE, RUN, TERMINATE, StartStopStep
  13. from celery.concurrency.base import BasePool
  14. from celery.exceptions import (ImproperlyConfigured, InvalidTaskError,
  15. TaskRevokedError, WorkerShutdown,
  16. WorkerTerminate)
  17. from celery.five import Queue as FastQueue
  18. from celery.five import Empty, range
  19. from celery.platforms import EX_FAILURE
  20. from celery.utils.nodenames import worker_direct
  21. from celery.utils.serialization import pickle
  22. from celery.utils.timer2 import Timer
  23. from celery.worker import worker as worker_module
  24. from celery.worker import components, consumer, state
  25. from celery.worker.consumer import Consumer
  26. from celery.worker.pidbox import gPidbox
  27. from celery.worker.request import Request
  28. from kombu import Connection
  29. from kombu.common import QoS, ignore_errors
  30. from kombu.transport.base import Message
  31. from kombu.transport.memory import Transport
  32. from kombu.utils.uuid import uuid
  33. def MockStep(step=None):
  34. if step is None:
  35. step = Mock(name='step')
  36. else:
  37. step.blueprint = Mock(name='step.blueprint')
  38. step.blueprint.name = 'MockNS'
  39. step.name = 'MockStep(%s)' % (id(step),)
  40. return step
  41. def mock_event_dispatcher():
  42. evd = Mock(name='event_dispatcher')
  43. evd.groups = ['worker']
  44. evd._outbound_buffer = deque()
  45. return evd
  46. def find_step(obj, typ):
  47. return obj.blueprint.steps[typ.name]
  48. def create_message(channel, **data):
  49. data.setdefault('id', uuid())
  50. m = Message(body=pickle.dumps(dict(**data)),
  51. channel=channel,
  52. content_type='application/x-python-serialize',
  53. content_encoding='binary',
  54. delivery_info={'consumer_tag': 'mock'})
  55. m.accept = ['application/x-python-serialize']
  56. return m
  57. class ConsumerCase:
  58. def create_task_message(self, channel, *args, **kwargs):
  59. m = self.TaskMessage(*args, **kwargs)
  60. m.channel = channel
  61. m.delivery_info = {'consumer_tag': 'mock'}
  62. return m
  63. class test_Consumer(ConsumerCase):
  64. def setup(self):
  65. self.buffer = FastQueue()
  66. self.timer = Timer()
  67. @self.app.task(shared=False)
  68. def foo_task(x, y, z):
  69. return x * y * z
  70. self.foo_task = foo_task
  71. def teardown(self):
  72. self.timer.stop()
  73. def LoopConsumer(self, buffer=None, controller=None, timer=None, app=None,
  74. without_mingle=True, without_gossip=True,
  75. without_heartbeat=True, **kwargs):
  76. if controller is None:
  77. controller = Mock(name='.controller')
  78. buffer = buffer if buffer is not None else self.buffer.put
  79. timer = timer if timer is not None else self.timer
  80. app = app if app is not None else self.app
  81. c = Consumer(
  82. buffer,
  83. timer=timer,
  84. app=app,
  85. controller=controller,
  86. without_mingle=without_mingle,
  87. without_gossip=without_gossip,
  88. without_heartbeat=without_heartbeat,
  89. **kwargs
  90. )
  91. c.task_consumer = Mock(name='.task_consumer')
  92. c.qos = QoS(c.task_consumer.qos, 10)
  93. c.connection = Mock(name='.connection')
  94. c.controller = c.app.WorkController()
  95. c.heart = Mock(name='.heart')
  96. c.controller.consumer = c
  97. c.pool = c.controller.pool = Mock(name='.controller.pool')
  98. c.node = Mock(name='.node')
  99. c.event_dispatcher = mock_event_dispatcher()
  100. return c
  101. def NoopConsumer(self, *args, **kwargs):
  102. c = self.LoopConsumer(*args, **kwargs)
  103. c.loop = Mock(name='.loop')
  104. return c
  105. def test_info(self):
  106. c = self.NoopConsumer()
  107. c.connection.info.return_value = {'foo': 'bar'}
  108. c.controller.pool.info.return_value = [Mock(), Mock()]
  109. info = c.controller.stats()
  110. assert info['prefetch_count'] == 10
  111. assert info['broker']
  112. def test_start_when_closed(self):
  113. c = self.NoopConsumer()
  114. c.blueprint.state = CLOSE
  115. c.start()
  116. def test_connection(self):
  117. c = self.NoopConsumer()
  118. c.blueprint.start(c)
  119. assert isinstance(c.connection, Connection)
  120. c.blueprint.state = RUN
  121. c.event_dispatcher = None
  122. c.blueprint.restart(c)
  123. assert c.connection
  124. c.blueprint.state = RUN
  125. c.shutdown()
  126. assert c.connection is None
  127. assert c.task_consumer is None
  128. c.blueprint.start(c)
  129. assert isinstance(c.connection, Connection)
  130. c.blueprint.restart(c)
  131. c.stop()
  132. c.shutdown()
  133. assert c.connection is None
  134. assert c.task_consumer is None
  135. def test_close_connection(self):
  136. c = self.NoopConsumer()
  137. c.blueprint.state = RUN
  138. step = find_step(c, consumer.Connection)
  139. connection = c.connection
  140. step.shutdown(c)
  141. connection.close.assert_called()
  142. assert c.connection is None
  143. def test_close_connection__heart_shutdown(self):
  144. c = self.NoopConsumer()
  145. event_dispatcher = c.event_dispatcher
  146. heart = c.heart
  147. c.event_dispatcher.enabled = True
  148. c.blueprint.state = RUN
  149. Events = find_step(c, consumer.Events)
  150. Events.shutdown(c)
  151. Heart = find_step(c, consumer.Heart)
  152. Heart.shutdown(c)
  153. event_dispatcher.close.assert_called()
  154. heart.stop.assert_called_with()
  155. @patch('celery.worker.consumer.consumer.warn')
  156. def test_receive_message_unknown(self, warn):
  157. c = self.LoopConsumer()
  158. c.blueprint.state = RUN
  159. c.steps.pop()
  160. channel = Mock(name='.channeol')
  161. m = create_message(channel, unknown={'baz': '!!!'})
  162. callback = self._get_on_message(c)
  163. callback(m)
  164. warn.assert_called()
  165. @patch('celery.worker.strategy.to_timestamp')
  166. def test_receive_message_eta_OverflowError(self, to_timestamp):
  167. to_timestamp.side_effect = OverflowError()
  168. c = self.LoopConsumer()
  169. c.blueprint.state = RUN
  170. c.steps.pop()
  171. m = self.create_task_message(
  172. Mock(), self.foo_task.name,
  173. args=('2, 2'), kwargs={},
  174. eta=datetime.now().isoformat(),
  175. )
  176. c.update_strategies()
  177. callback = self._get_on_message(c)
  178. callback(m)
  179. assert m.acknowledged
  180. @patch('celery.worker.consumer.consumer.error')
  181. def test_receive_message_InvalidTaskError(self, error):
  182. c = self.LoopConsumer()
  183. c.blueprint.state = RUN
  184. c.steps.pop()
  185. m = self.create_task_message(
  186. Mock(), self.foo_task.name,
  187. args=(1, 2), kwargs='foobarbaz', id=1)
  188. c.update_strategies()
  189. strat = c.strategies[self.foo_task.name] = Mock(name='strategy')
  190. strat.side_effect = InvalidTaskError()
  191. callback = self._get_on_message(c)
  192. callback(m)
  193. error.assert_called()
  194. assert 'Received invalid task message' in error.call_args[0][0]
  195. @patch('celery.worker.consumer.consumer.crit')
  196. def test_on_decode_error(self, crit):
  197. c = self.LoopConsumer()
  198. class MockMessage(Mock):
  199. content_type = 'application/x-msgpack'
  200. content_encoding = 'binary'
  201. body = 'foobarbaz'
  202. message = MockMessage()
  203. c.on_decode_error(message, KeyError('foo'))
  204. assert message.ack.call_count
  205. assert "Can't decode message body" in crit.call_args[0][0]
  206. def _get_on_message(self, c):
  207. if c.qos is None:
  208. c.qos = Mock()
  209. c.task_consumer = Mock()
  210. c.event_dispatcher = mock_event_dispatcher()
  211. c.connection = Mock(name='.connection')
  212. c.connection.get_heartbeat_interval.return_value = 0
  213. c.connection.drain_events.side_effect = WorkerShutdown()
  214. with pytest.raises(WorkerShutdown):
  215. c.loop(*c.loop_args())
  216. assert c.task_consumer.on_message
  217. return c.task_consumer.on_message
  218. def test_receieve_message(self):
  219. c = self.LoopConsumer()
  220. c.blueprint.state = RUN
  221. m = self.create_task_message(
  222. Mock(), self.foo_task.name,
  223. args=[2, 4, 8], kwargs={},
  224. )
  225. c.update_strategies()
  226. callback = self._get_on_message(c)
  227. callback(m)
  228. in_bucket = self.buffer.get_nowait()
  229. assert isinstance(in_bucket, Request)
  230. assert in_bucket.name == self.foo_task.name
  231. assert in_bucket.execute() == 2 * 4 * 8
  232. assert self.timer.empty()
  233. def test_start_channel_error(self):
  234. c = self.NoopConsumer(task_events=False, pool=BasePool())
  235. c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
  236. c.channel_errors = (KeyError,)
  237. try:
  238. with pytest.raises(KeyError):
  239. c.start()
  240. finally:
  241. c.timer and c.timer.stop()
  242. def test_start_connection_error(self):
  243. c = self.NoopConsumer(task_events=False, pool=BasePool())
  244. c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
  245. c.connection_errors = (KeyError,)
  246. try:
  247. with pytest.raises(SyntaxError):
  248. c.start()
  249. finally:
  250. c.timer and c.timer.stop()
  251. def test_loop_ignores_socket_timeout(self):
  252. class Connection(self.app.connection_for_read().__class__):
  253. obj = None
  254. def drain_events(self, **kwargs):
  255. self.obj.connection = None
  256. raise socket.timeout(10)
  257. c = self.NoopConsumer()
  258. c.connection = Connection(self.app.conf.broker_url)
  259. c.connection.obj = c
  260. c.qos = QoS(c.task_consumer.qos, 10)
  261. c.loop(*c.loop_args())
  262. def test_loop_when_socket_error(self):
  263. class Connection(self.app.connection_for_read().__class__):
  264. obj = None
  265. def drain_events(self, **kwargs):
  266. self.obj.connection = None
  267. raise socket.error('foo')
  268. c = self.LoopConsumer()
  269. c.blueprint.state = RUN
  270. conn = c.connection = Connection(self.app.conf.broker_url)
  271. c.connection.obj = c
  272. c.qos = QoS(c.task_consumer.qos, 10)
  273. with pytest.raises(socket.error):
  274. c.loop(*c.loop_args())
  275. c.blueprint.state = CLOSE
  276. c.connection = conn
  277. c.loop(*c.loop_args())
  278. def test_loop(self):
  279. class Connection(self.app.connection_for_read().__class__):
  280. obj = None
  281. def drain_events(self, **kwargs):
  282. self.obj.connection = None
  283. @property
  284. def supports_heartbeats(self):
  285. return False
  286. c = self.LoopConsumer()
  287. c.blueprint.state = RUN
  288. c.connection = Connection(self.app.conf.broker_url)
  289. c.connection.obj = c
  290. c.connection.get_heartbeat_interval = Mock(return_value=None)
  291. c.qos = QoS(c.task_consumer.qos, 10)
  292. c.loop(*c.loop_args())
  293. c.loop(*c.loop_args())
  294. assert c.task_consumer.consume.call_count
  295. c.task_consumer.qos.assert_called_with(prefetch_count=10)
  296. assert c.qos.value == 10
  297. c.qos.decrement_eventually()
  298. assert c.qos.value == 9
  299. c.qos.update()
  300. assert c.qos.value == 9
  301. c.task_consumer.qos.assert_called_with(prefetch_count=9)
  302. def test_ignore_errors(self):
  303. c = self.NoopConsumer()
  304. c.connection_errors = (AttributeError, KeyError,)
  305. c.channel_errors = (SyntaxError,)
  306. ignore_errors(c, Mock(side_effect=AttributeError('foo')))
  307. ignore_errors(c, Mock(side_effect=KeyError('foo')))
  308. ignore_errors(c, Mock(side_effect=SyntaxError('foo')))
  309. with pytest.raises(IndexError):
  310. ignore_errors(c, Mock(side_effect=IndexError('foo')))
  311. def test_apply_eta_task(self):
  312. c = self.NoopConsumer()
  313. c.qos = QoS(None, 10)
  314. task = Mock(name='task', id='1234213')
  315. qos = c.qos.value
  316. c.apply_eta_task(task)
  317. assert task in state.reserved_requests
  318. assert c.qos.value == qos - 1
  319. assert self.buffer.get_nowait() is task
  320. def test_receieve_message_eta_isoformat(self):
  321. c = self.LoopConsumer()
  322. c.blueprint.state = RUN
  323. c.steps.pop()
  324. m = self.create_task_message(
  325. Mock(), self.foo_task.name,
  326. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  327. args=[2, 4, 8], kwargs={},
  328. )
  329. c.qos = QoS(c.task_consumer.qos, 1)
  330. current_pcount = c.qos.value
  331. c.event_dispatcher.enabled = False
  332. c.update_strategies()
  333. callback = self._get_on_message(c)
  334. callback(m)
  335. c.timer.stop()
  336. c.timer.join(1)
  337. items = [entry[2] for entry in self.timer.queue]
  338. found = 0
  339. for item in items:
  340. if item.args[0].name == self.foo_task.name:
  341. found = True
  342. assert found
  343. assert c.qos.value > current_pcount
  344. c.timer.stop()
  345. def test_pidbox_callback(self):
  346. c = self.NoopConsumer()
  347. con = find_step(c, consumer.Control).box
  348. con.node = Mock()
  349. con.reset = Mock()
  350. con.on_message('foo', 'bar')
  351. con.node.handle_message.assert_called_with('foo', 'bar')
  352. con.node = Mock()
  353. con.node.handle_message.side_effect = KeyError('foo')
  354. con.on_message('foo', 'bar')
  355. con.node.handle_message.assert_called_with('foo', 'bar')
  356. con.node = Mock()
  357. con.node.handle_message.side_effect = ValueError('foo')
  358. con.on_message('foo', 'bar')
  359. con.node.handle_message.assert_called_with('foo', 'bar')
  360. con.reset.assert_called()
  361. def test_revoke(self):
  362. c = self.LoopConsumer()
  363. c.blueprint.state = RUN
  364. c.steps.pop()
  365. channel = Mock(name='channel')
  366. id = uuid()
  367. t = self.create_task_message(
  368. channel, self.foo_task.name,
  369. args=[2, 4, 8], kwargs={}, id=id,
  370. )
  371. state.revoked.add(id)
  372. callback = self._get_on_message(c)
  373. callback(t)
  374. assert self.buffer.empty()
  375. def test_receieve_message_not_registered(self):
  376. c = self.LoopConsumer()
  377. c.blueprint.state = RUN
  378. c.steps.pop()
  379. channel = Mock(name='channel')
  380. m = self.create_task_message(
  381. channel, 'x.X.31x', args=[2, 4, 8], kwargs={},
  382. )
  383. callback = self._get_on_message(c)
  384. assert not callback(m)
  385. with pytest.raises(Empty):
  386. self.buffer.get_nowait()
  387. assert self.timer.empty()
  388. @patch('celery.worker.consumer.consumer.warn')
  389. @patch('celery.worker.consumer.consumer.logger')
  390. def test_receieve_message_ack_raises(self, logger, warn):
  391. c = self.LoopConsumer()
  392. c.blueprint.state = RUN
  393. channel = Mock(name='channel')
  394. m = self.create_task_message(
  395. channel, self.foo_task.name,
  396. args=[2, 4, 8], kwargs={},
  397. )
  398. m.headers = None
  399. c.update_strategies()
  400. c.connection_errors = (socket.error,)
  401. m.reject = Mock()
  402. m.reject.side_effect = socket.error('foo')
  403. callback = self._get_on_message(c)
  404. assert not callback(m)
  405. warn.assert_called()
  406. with pytest.raises(Empty):
  407. self.buffer.get_nowait()
  408. assert self.timer.empty()
  409. m.reject_log_error.assert_called_with(logger, c.connection_errors)
  410. def test_receive_message_eta(self):
  411. if os.environ.get('C_DEBUG_TEST'):
  412. pp = partial(print, file=sys.__stderr__)
  413. else:
  414. def pp(*args, **kwargs):
  415. pass
  416. pp('TEST RECEIVE MESSAGE ETA')
  417. pp('+CREATE MYKOMBUCONSUMER')
  418. c = self.LoopConsumer()
  419. pp('-CREATE MYKOMBUCONSUMER')
  420. c.steps.pop()
  421. channel = Mock(name='channel')
  422. pp('+ CREATE MESSAGE')
  423. m = self.create_task_message(
  424. channel, self.foo_task.name,
  425. args=[2, 4, 8], kwargs={},
  426. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  427. )
  428. pp('- CREATE MESSAGE')
  429. try:
  430. pp('+ BLUEPRINT START 1')
  431. c.blueprint.start(c)
  432. pp('- BLUEPRINT START 1')
  433. p = c.app.conf.broker_connection_retry
  434. c.app.conf.broker_connection_retry = False
  435. pp('+ BLUEPRINT START 2')
  436. c.blueprint.start(c)
  437. pp('- BLUEPRINT START 2')
  438. c.app.conf.broker_connection_retry = p
  439. pp('+ BLUEPRINT RESTART')
  440. c.blueprint.restart(c)
  441. pp('- BLUEPRINT RESTART')
  442. pp('+ GET ON MESSAGE')
  443. callback = self._get_on_message(c)
  444. pp('- GET ON MESSAGE')
  445. pp('+ CALLBACK')
  446. callback(m)
  447. pp('- CALLBACK')
  448. finally:
  449. pp('+ STOP TIMER')
  450. c.timer.stop()
  451. pp('- STOP TIMER')
  452. try:
  453. pp('+ JOIN TIMER')
  454. c.timer.join()
  455. pp('- JOIN TIMER')
  456. except RuntimeError:
  457. pass
  458. in_hold = c.timer.queue[0]
  459. assert len(in_hold) == 3
  460. eta, priority, entry = in_hold
  461. task = entry.args[0]
  462. assert isinstance(task, Request)
  463. assert task.name == self.foo_task.name
  464. assert task.execute() == 2 * 4 * 8
  465. with pytest.raises(Empty):
  466. self.buffer.get_nowait()
  467. def test_reset_pidbox_node(self):
  468. c = self.NoopConsumer()
  469. con = find_step(c, consumer.Control).box
  470. con.node = Mock()
  471. chan = con.node.channel = Mock()
  472. chan.close.side_effect = socket.error('foo')
  473. c.connection_errors = (socket.error,)
  474. con.reset()
  475. chan.close.assert_called_with()
  476. def test_reset_pidbox_node_green(self):
  477. c = self.NoopConsumer(pool=Mock(is_green=True))
  478. con = find_step(c, consumer.Control)
  479. assert isinstance(con.box, gPidbox)
  480. con.start(c)
  481. c.pool.spawn_n.assert_called_with(con.box.loop, c)
  482. def test_green_pidbox_node(self):
  483. pool = Mock()
  484. pool.is_green = True
  485. c = self.NoopConsumer(pool=Mock(is_green=True))
  486. controller = find_step(c, consumer.Control)
  487. class BConsumer(Mock):
  488. def __enter__(self):
  489. self.consume()
  490. return self
  491. def __exit__(self, *exc_info):
  492. self.cancel()
  493. controller.box.node.listen = BConsumer()
  494. connections = []
  495. class Connection(object):
  496. calls = 0
  497. def __init__(self, obj):
  498. connections.append(self)
  499. self.obj = obj
  500. self.default_channel = self.channel()
  501. self.closed = False
  502. def __enter__(self):
  503. return self
  504. def __exit__(self, *exc_info):
  505. self.close()
  506. def channel(self):
  507. return Mock()
  508. def as_uri(self):
  509. return 'dummy://'
  510. def drain_events(self, **kwargs):
  511. if not self.calls:
  512. self.calls += 1
  513. raise socket.timeout()
  514. self.obj.connection = None
  515. controller.box._node_shutdown.set()
  516. def close(self):
  517. self.closed = True
  518. c.connection_for_read = lambda: Connection(obj=c)
  519. controller = find_step(c, consumer.Control)
  520. controller.box.loop(c)
  521. controller.box.node.listen.assert_called()
  522. assert controller.box.consumer
  523. controller.box.consumer.consume.assert_called_with()
  524. assert c.connection is None
  525. assert connections[0].closed
  526. @patch('kombu.connection.Connection._establish_connection')
  527. @patch('kombu.utils.functional.sleep')
  528. def test_connect_errback(self, sleep, connect):
  529. c = self.NoopConsumer()
  530. Transport.connection_errors = (ChannelError,)
  531. connect.on_nth_call_do(ChannelError('error'), n=1)
  532. c.connect()
  533. connect.assert_called_with()
  534. def test_stop_pidbox_node(self):
  535. c = self.NoopConsumer()
  536. cont = find_step(c, consumer.Control)
  537. cont._node_stopped = Event()
  538. cont._node_shutdown = Event()
  539. cont._node_stopped.set()
  540. cont.stop(c)
  541. def test_start__loop(self):
  542. class _QoS(object):
  543. prev = 3
  544. value = 4
  545. def update(self):
  546. self.prev = self.value
  547. init_callback = Mock(name='init_callback')
  548. c = self.NoopConsumer(init_callback=init_callback)
  549. c.qos = _QoS()
  550. c.connection = Connection(self.app.conf.broker_url)
  551. c.connection.get_heartbeat_interval = Mock(return_value=None)
  552. c.iterations = 0
  553. def raises_KeyError(*args, **kwargs):
  554. c.iterations += 1
  555. if c.qos.prev != c.qos.value:
  556. c.qos.update()
  557. if c.iterations >= 2:
  558. raise KeyError('foo')
  559. c.loop = raises_KeyError
  560. with pytest.raises(KeyError):
  561. c.start()
  562. assert c.iterations == 2
  563. assert c.qos.prev == c.qos.value
  564. init_callback.reset_mock()
  565. c = self.NoopConsumer(task_events=False, init_callback=init_callback)
  566. c.qos = _QoS()
  567. c.connection = Connection(self.app.conf.broker_url)
  568. c.connection.get_heartbeat_interval = Mock(return_value=None)
  569. c.loop = Mock(side_effect=socket.error('foo'))
  570. with pytest.raises(socket.error):
  571. c.start()
  572. c.loop.assert_called()
  573. def test_reset_connection_with_no_node(self):
  574. c = self.NoopConsumer()
  575. c.steps.pop()
  576. c.blueprint.start(c)
  577. class test_WorkController(ConsumerCase):
  578. def setup(self):
  579. self.worker = self.create_worker()
  580. self._logger = worker_module.logger
  581. self._comp_logger = components.logger
  582. self.logger = worker_module.logger = Mock()
  583. self.comp_logger = components.logger = Mock()
  584. @self.app.task(shared=False)
  585. def foo_task(x, y, z):
  586. return x * y * z
  587. self.foo_task = foo_task
  588. def teardown(self):
  589. worker_module.logger = self._logger
  590. components.logger = self._comp_logger
  591. def create_worker(self, **kw):
  592. worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
  593. worker.blueprint.shutdown_complete.set()
  594. return worker
  595. def test_on_consumer_ready(self):
  596. self.worker.on_consumer_ready(Mock())
  597. def test_setup_queues_worker_direct(self):
  598. self.app.conf.worker_direct = True
  599. self.app.amqp.__dict__['queues'] = Mock()
  600. self.worker.setup_queues({})
  601. self.app.amqp.queues.select_add.assert_called_with(
  602. worker_direct(self.worker.hostname),
  603. )
  604. def test_setup_queues__missing_queue(self):
  605. self.app.amqp.queues.select = Mock(name='select')
  606. self.app.amqp.queues.deselect = Mock(name='deselect')
  607. self.app.amqp.queues.select.side_effect = KeyError()
  608. self.app.amqp.queues.deselect.side_effect = KeyError()
  609. with pytest.raises(ImproperlyConfigured):
  610. self.worker.setup_queues('x,y', exclude='foo,bar')
  611. self.app.amqp.queues.select = Mock(name='select')
  612. with pytest.raises(ImproperlyConfigured):
  613. self.worker.setup_queues('x,y', exclude='foo,bar')
  614. def test_send_worker_shutdown(self):
  615. with patch('celery.signals.worker_shutdown') as ws:
  616. self.worker._send_worker_shutdown()
  617. ws.send.assert_called_with(sender=self.worker)
  618. @skip.todo('unstable test')
  619. def test_process_shutdown_on_worker_shutdown(self):
  620. from celery.concurrency.prefork import process_destructor
  621. from celery.concurrency.asynpool import Worker
  622. with patch('celery.signals.worker_process_shutdown') as ws:
  623. with patch('os._exit') as _exit:
  624. worker = Worker(None, None, on_exit=process_destructor)
  625. worker._do_exit(22, 3.1415926)
  626. ws.send.assert_called_with(
  627. sender=None, pid=22, exitcode=3.1415926,
  628. )
  629. _exit.assert_called_with(3.1415926)
  630. def test_process_task_revoked_release_semaphore(self):
  631. self.worker._quick_release = Mock()
  632. req = Mock()
  633. req.execute_using_pool.side_effect = TaskRevokedError
  634. self.worker._process_task(req)
  635. self.worker._quick_release.assert_called_with()
  636. delattr(self.worker, '_quick_release')
  637. self.worker._process_task(req)
  638. def test_shutdown_no_blueprint(self):
  639. self.worker.blueprint = None
  640. self.worker._shutdown()
  641. @patch('celery.worker.worker.create_pidlock')
  642. def test_use_pidfile(self, create_pidlock):
  643. create_pidlock.return_value = Mock()
  644. worker = self.create_worker(pidfile='pidfilelockfilepid')
  645. worker.steps = []
  646. worker.start()
  647. create_pidlock.assert_called()
  648. worker.stop()
  649. worker.pidlock.release.assert_called()
  650. def test_attrs(self):
  651. worker = self.worker
  652. assert worker.timer is not None
  653. assert isinstance(worker.timer, Timer)
  654. assert worker.pool is not None
  655. assert worker.consumer is not None
  656. assert worker.steps
  657. def test_with_embedded_beat(self):
  658. worker = self.app.WorkController(concurrency=1, loglevel=0, beat=True)
  659. assert worker.beat
  660. assert worker.beat in [w.obj for w in worker.steps]
  661. def test_with_autoscaler(self):
  662. worker = self.create_worker(
  663. autoscale=[10, 3], send_events=False,
  664. timer_cls='celery.utils.timer2.Timer',
  665. )
  666. assert worker.autoscaler
  667. def test_dont_stop_or_terminate(self):
  668. worker = self.app.WorkController(concurrency=1, loglevel=0)
  669. worker.stop()
  670. assert worker.blueprint.state != CLOSE
  671. worker.terminate()
  672. assert worker.blueprint.state != CLOSE
  673. sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
  674. try:
  675. worker.blueprint.state = RUN
  676. worker.stop(in_sighandler=True)
  677. assert worker.blueprint.state != CLOSE
  678. worker.terminate(in_sighandler=True)
  679. assert worker.blueprint.state != CLOSE
  680. finally:
  681. worker.pool.signal_safe = sigsafe
  682. def test_on_timer_error(self):
  683. worker = self.app.WorkController(concurrency=1, loglevel=0)
  684. try:
  685. raise KeyError('foo')
  686. except KeyError as exc:
  687. components.Timer(worker).on_timer_error(exc)
  688. msg, args = self.comp_logger.error.call_args[0]
  689. assert 'KeyError' in msg % args
  690. def test_on_timer_tick(self):
  691. worker = self.app.WorkController(concurrency=1, loglevel=10)
  692. components.Timer(worker).on_timer_tick(30.0)
  693. xargs = self.comp_logger.debug.call_args[0]
  694. fmt, arg = xargs[0], xargs[1]
  695. assert arg == 30.0
  696. assert 'Next ETA %s secs' in fmt
  697. def test_process_task(self):
  698. worker = self.worker
  699. worker.pool = Mock()
  700. channel = Mock()
  701. m = self.create_task_message(
  702. channel, self.foo_task.name,
  703. args=[4, 8, 10], kwargs={},
  704. )
  705. task = Request(m, app=self.app)
  706. worker._process_task(task)
  707. assert worker.pool.apply_async.call_count == 1
  708. worker.pool.stop()
  709. def test_process_task_raise_base(self):
  710. worker = self.worker
  711. worker.pool = Mock()
  712. worker.pool.apply_async.side_effect = KeyboardInterrupt('Ctrl+C')
  713. channel = Mock()
  714. m = self.create_task_message(
  715. channel, self.foo_task.name,
  716. args=[4, 8, 10], kwargs={},
  717. )
  718. task = Request(m, app=self.app)
  719. worker.steps = []
  720. worker.blueprint.state = RUN
  721. with pytest.raises(KeyboardInterrupt):
  722. worker._process_task(task)
  723. def test_process_task_raise_WorkerTerminate(self):
  724. worker = self.worker
  725. worker.pool = Mock()
  726. worker.pool.apply_async.side_effect = WorkerTerminate()
  727. channel = Mock()
  728. m = self.create_task_message(
  729. channel, self.foo_task.name,
  730. args=[4, 8, 10], kwargs={},
  731. )
  732. task = Request(m, app=self.app)
  733. worker.steps = []
  734. worker.blueprint.state = RUN
  735. with pytest.raises(SystemExit):
  736. worker._process_task(task)
  737. def test_process_task_raise_regular(self):
  738. worker = self.worker
  739. worker.pool = Mock()
  740. worker.pool.apply_async.side_effect = KeyError('some exception')
  741. channel = Mock()
  742. m = self.create_task_message(
  743. channel, self.foo_task.name,
  744. args=[4, 8, 10], kwargs={},
  745. )
  746. task = Request(m, app=self.app)
  747. with pytest.raises(KeyError):
  748. worker._process_task(task)
  749. worker.pool.stop()
  750. def test_start_catches_base_exceptions(self):
  751. worker1 = self.create_worker()
  752. worker1.blueprint.state = RUN
  753. stc = MockStep()
  754. stc.start.side_effect = WorkerTerminate()
  755. worker1.steps = [stc]
  756. worker1.start()
  757. stc.start.assert_called_with(worker1)
  758. assert stc.terminate.call_count
  759. worker2 = self.create_worker()
  760. worker2.blueprint.state = RUN
  761. sec = MockStep()
  762. sec.start.side_effect = WorkerShutdown()
  763. sec.terminate = None
  764. worker2.steps = [sec]
  765. worker2.start()
  766. assert sec.stop.call_count
  767. def test_statedb(self):
  768. from celery.worker import state
  769. Persistent = state.Persistent
  770. state.Persistent = Mock()
  771. try:
  772. worker = self.create_worker(statedb='statefilename')
  773. assert worker._persistence
  774. finally:
  775. state.Persistent = Persistent
  776. def test_process_task_sem(self):
  777. worker = self.worker
  778. worker._quick_acquire = Mock()
  779. req = Mock()
  780. worker._process_task_sem(req)
  781. worker._quick_acquire.assert_called_with(worker._process_task, req)
  782. def test_signal_consumer_close(self):
  783. worker = self.worker
  784. worker.consumer = Mock()
  785. worker.signal_consumer_close()
  786. worker.consumer.close.assert_called_with()
  787. worker.consumer.close.side_effect = AttributeError()
  788. worker.signal_consumer_close()
  789. def test_rusage__no_resource(self):
  790. from celery.worker import worker
  791. prev, worker.resource = worker.resource, None
  792. try:
  793. self.worker.pool = Mock(name='pool')
  794. with pytest.raises(NotImplementedError):
  795. self.worker.rusage()
  796. self.worker.stats()
  797. finally:
  798. worker.resource = prev
  799. def test_repr(self):
  800. assert repr(self.worker)
  801. def test_str(self):
  802. assert str(self.worker) == self.worker.hostname
  803. def test_start__stop(self):
  804. worker = self.worker
  805. worker.blueprint.shutdown_complete.set()
  806. worker.steps = [MockStep(StartStopStep(self)) for _ in range(4)]
  807. worker.blueprint.state = RUN
  808. worker.blueprint.started = 4
  809. for w in worker.steps:
  810. w.start = Mock()
  811. w.close = Mock()
  812. w.stop = Mock()
  813. worker.start()
  814. for w in worker.steps:
  815. w.start.assert_called()
  816. worker.consumer = Mock()
  817. worker.stop(exitcode=3)
  818. for stopstep in worker.steps:
  819. stopstep.close.assert_called()
  820. stopstep.stop.assert_called()
  821. # Doesn't close pool if no pool.
  822. worker.start()
  823. worker.pool = None
  824. worker.stop()
  825. # test that stop of None is not attempted
  826. worker.steps[-1] = None
  827. worker.start()
  828. worker.stop()
  829. def test_start__KeyboardInterrupt(self):
  830. worker = self.worker
  831. worker.blueprint = Mock(name='blueprint')
  832. worker.blueprint.start.side_effect = KeyboardInterrupt()
  833. worker.stop = Mock(name='stop')
  834. worker.start()
  835. worker.stop.assert_called_with(exitcode=EX_FAILURE)
  836. def test_register_with_event_loop(self):
  837. worker = self.worker
  838. hub = Mock(name='hub')
  839. worker.blueprint = Mock(name='blueprint')
  840. worker.register_with_event_loop(hub)
  841. worker.blueprint.send_all.assert_called_with(
  842. worker, 'register_with_event_loop', args=(hub,),
  843. description='hub.register',
  844. )
  845. def test_step_raises(self):
  846. worker = self.worker
  847. step = Mock()
  848. worker.steps = [step]
  849. step.start.side_effect = TypeError()
  850. worker.stop = Mock()
  851. worker.start()
  852. worker.stop.assert_called_with(exitcode=EX_FAILURE)
  853. def test_state(self):
  854. assert self.worker.state
  855. def test_start__terminate(self):
  856. worker = self.worker
  857. worker.blueprint.shutdown_complete.set()
  858. worker.blueprint.started = 5
  859. worker.blueprint.state = RUN
  860. worker.steps = [MockStep() for _ in range(5)]
  861. worker.start()
  862. for w in worker.steps[:3]:
  863. w.start.assert_called()
  864. assert worker.blueprint.started == len(worker.steps)
  865. assert worker.blueprint.state == RUN
  866. worker.terminate()
  867. for step in worker.steps:
  868. step.terminate.assert_called()
  869. worker.blueprint.state = TERMINATE
  870. worker.terminate()
  871. def test_Hub_create(self):
  872. w = Mock()
  873. x = components.Hub(w)
  874. x.create(w)
  875. assert w.timer.max_interval
  876. def test_Pool_create_threaded(self):
  877. w = Mock()
  878. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  879. w.pool_cls = Mock()
  880. w.use_eventloop = False
  881. pool = components.Pool(w)
  882. pool.create(w)
  883. def test_Pool_pool_no_sem(self):
  884. w = Mock()
  885. w.pool_cls.uses_semaphore = False
  886. components.Pool(w).create(w)
  887. assert w.process_task is w._process_task
  888. def test_Pool_create(self):
  889. from kombu.async.semaphore import LaxBoundedSemaphore
  890. w = Mock()
  891. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  892. w.hub = Mock()
  893. PoolImp = Mock()
  894. poolimp = PoolImp.return_value = Mock()
  895. poolimp._pool = [Mock(), Mock()]
  896. poolimp._cache = {}
  897. poolimp._fileno_to_inq = {}
  898. poolimp._fileno_to_outq = {}
  899. from celery.concurrency.prefork import TaskPool as _TaskPool
  900. class MockTaskPool(_TaskPool):
  901. Pool = PoolImp
  902. @property
  903. def timers(self):
  904. return {Mock(): 30}
  905. w.pool_cls = MockTaskPool
  906. w.use_eventloop = True
  907. w.consumer.restart_count = -1
  908. pool = components.Pool(w)
  909. pool.create(w)
  910. pool.register_with_event_loop(w, w.hub)
  911. if sys.platform != 'win32':
  912. assert isinstance(w.semaphore, LaxBoundedSemaphore)
  913. P = w.pool
  914. P.start()