test_worker.py 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063
  1. from __future__ import absolute_import, print_function, unicode_literals
  2. import os
  3. import socket
  4. import sys
  5. from collections import deque
  6. from datetime import datetime, timedelta
  7. from functools import partial
  8. from threading import Event
  9. from amqp import ChannelError
  10. from kombu import Connection
  11. from kombu.common import QoS, ignore_errors
  12. from kombu.transport.base import Message
  13. from kombu.transport.memory import Transport
  14. from celery.bootsteps import RUN, CLOSE, TERMINATE, StartStopStep
  15. from celery.concurrency.base import BasePool
  16. from celery.exceptions import (
  17. WorkerShutdown, WorkerTerminate, TaskRevokedError,
  18. InvalidTaskError, ImproperlyConfigured,
  19. )
  20. from celery.five import Empty, range, Queue as FastQueue
  21. from celery.platforms import EX_FAILURE
  22. from celery.utils import uuid
  23. from celery import worker as worker_module
  24. from celery.worker import components
  25. from celery.worker import consumer
  26. from celery.worker import state
  27. from celery.worker.consumer import Consumer
  28. from celery.worker.pidbox import gPidbox
  29. from celery.worker.request import Request
  30. from celery.utils import worker_direct
  31. from celery.utils.serialization import pickle
  32. from celery.utils.timer2 import Timer
  33. from celery.tests.case import AppCase, Mock, TaskMessage, patch, skip
  34. def MockStep(step=None):
  35. if step is None:
  36. step = Mock(name='step')
  37. else:
  38. step.blueprint = Mock(name='step.blueprint')
  39. step.blueprint.name = 'MockNS'
  40. step.name = 'MockStep(%s)' % (id(step),)
  41. return step
  42. def mock_event_dispatcher():
  43. evd = Mock(name='event_dispatcher')
  44. evd.groups = ['worker']
  45. evd._outbound_buffer = deque()
  46. return evd
  47. def find_step(obj, typ):
  48. return obj.blueprint.steps[typ.name]
  49. def create_message(channel, **data):
  50. data.setdefault('id', uuid())
  51. m = Message(channel, body=pickle.dumps(dict(**data)),
  52. content_type='application/x-python-serialize',
  53. content_encoding='binary',
  54. delivery_info={'consumer_tag': 'mock'})
  55. m.accept = ['application/x-python-serialize']
  56. return m
  57. def create_task_message(channel, *args, **kwargs):
  58. m = TaskMessage(*args, **kwargs)
  59. m.channel = channel
  60. m.delivery_info = {'consumer_tag': 'mock'}
  61. return m
  62. class test_Consumer(AppCase):
  63. def setup(self):
  64. self.buffer = FastQueue()
  65. self.timer = Timer()
  66. @self.app.task(shared=False)
  67. def foo_task(x, y, z):
  68. return x * y * z
  69. self.foo_task = foo_task
  70. def teardown(self):
  71. self.timer.stop()
  72. def LoopConsumer(self, buffer=None, controller=None, timer=None, app=None,
  73. without_mingle=True, without_gossip=True,
  74. without_heartbeat=True, **kwargs):
  75. if controller is None:
  76. controller = Mock(name='.controller')
  77. buffer = buffer if buffer is not None else self.buffer.put
  78. timer = timer if timer is not None else self.timer
  79. app = app if app is not None else self.app
  80. c = Consumer(
  81. buffer,
  82. timer=timer,
  83. app=app,
  84. controller=controller,
  85. without_mingle=without_mingle,
  86. without_gossip=without_gossip,
  87. without_heartbeat=without_heartbeat,
  88. **kwargs
  89. )
  90. c.task_consumer = Mock(name='.task_consumer')
  91. c.qos = QoS(c.task_consumer.qos, 10)
  92. c.connection = Mock(name='.connection')
  93. c.controller = c.app.WorkController()
  94. c.heart = Mock(name='.heart')
  95. c.controller.consumer = c
  96. c.pool = c.controller.pool = Mock(name='.controller.pool')
  97. c.node = Mock(name='.node')
  98. c.event_dispatcher = mock_event_dispatcher()
  99. return c
  100. def NoopConsumer(self, *args, **kwargs):
  101. c = self.LoopConsumer(*args, **kwargs)
  102. c.loop = Mock(name='.loop')
  103. return c
  104. def test_info(self):
  105. c = self.NoopConsumer()
  106. c.connection.info.return_value = {'foo': 'bar'}
  107. c.controller.pool.info.return_value = [Mock(), Mock()]
  108. info = c.controller.stats()
  109. self.assertEqual(info['prefetch_count'], 10)
  110. self.assertTrue(info['broker'])
  111. def test_start_when_closed(self):
  112. c = self.NoopConsumer()
  113. c.blueprint.state = CLOSE
  114. c.start()
  115. def test_connection(self):
  116. c = self.NoopConsumer()
  117. c.blueprint.start(c)
  118. self.assertIsInstance(c.connection, Connection)
  119. c.blueprint.state = RUN
  120. c.event_dispatcher = None
  121. c.blueprint.restart(c)
  122. self.assertTrue(c.connection)
  123. c.blueprint.state = RUN
  124. c.shutdown()
  125. self.assertIsNone(c.connection)
  126. self.assertIsNone(c.task_consumer)
  127. c.blueprint.start(c)
  128. self.assertIsInstance(c.connection, Connection)
  129. c.blueprint.restart(c)
  130. c.stop()
  131. c.shutdown()
  132. self.assertIsNone(c.connection)
  133. self.assertIsNone(c.task_consumer)
  134. def test_close_connection(self):
  135. c = self.NoopConsumer()
  136. c.blueprint.state = RUN
  137. step = find_step(c, consumer.Connection)
  138. connection = c.connection
  139. step.shutdown(c)
  140. connection.close.assert_called()
  141. self.assertIsNone(c.connection)
  142. def test_close_connection__heart_shutdown(self):
  143. c = self.NoopConsumer()
  144. event_dispatcher = c.event_dispatcher
  145. heart = c.heart
  146. c.event_dispatcher.enabled = True
  147. c.blueprint.state = RUN
  148. Events = find_step(c, consumer.Events)
  149. Events.shutdown(c)
  150. Heart = find_step(c, consumer.Heart)
  151. Heart.shutdown(c)
  152. event_dispatcher.close.assert_called()
  153. heart.stop.assert_called_with()
  154. @patch('celery.worker.consumer.consumer.warn')
  155. def test_receive_message_unknown(self, warn):
  156. c = self.LoopConsumer()
  157. c.blueprint.state = RUN
  158. c.steps.pop()
  159. channel = Mock(name='.channeol')
  160. m = create_message(channel, unknown={'baz': '!!!'})
  161. callback = self._get_on_message(c)
  162. callback(m)
  163. self.assertTrue(warn.call_count)
  164. @patch('celery.worker.strategy.to_timestamp')
  165. def test_receive_message_eta_OverflowError(self, to_timestamp):
  166. to_timestamp.side_effect = OverflowError()
  167. c = self.LoopConsumer()
  168. c.blueprint.state = RUN
  169. c.steps.pop()
  170. m = create_task_message(
  171. Mock(), self.foo_task.name,
  172. args=('2, 2'), kwargs={},
  173. eta=datetime.now().isoformat(),
  174. )
  175. c.update_strategies()
  176. callback = self._get_on_message(c)
  177. callback(m)
  178. self.assertTrue(m.acknowledged)
  179. @patch('celery.worker.consumer.consumer.error')
  180. def test_receive_message_InvalidTaskError(self, error):
  181. c = self.LoopConsumer()
  182. c.blueprint.state = RUN
  183. c.steps.pop()
  184. m = create_task_message(
  185. Mock(), self.foo_task.name,
  186. args=(1, 2), kwargs='foobarbaz', id=1)
  187. c.update_strategies()
  188. strat = c.strategies[self.foo_task.name] = Mock(name='strategy')
  189. strat.side_effect = InvalidTaskError()
  190. callback = self._get_on_message(c)
  191. callback(m)
  192. error.assert_called()
  193. self.assertIn('Received invalid task message', error.call_args[0][0])
  194. @patch('celery.worker.consumer.consumer.crit')
  195. def test_on_decode_error(self, crit):
  196. c = self.LoopConsumer()
  197. class MockMessage(Mock):
  198. content_type = 'application/x-msgpack'
  199. content_encoding = 'binary'
  200. body = 'foobarbaz'
  201. message = MockMessage()
  202. c.on_decode_error(message, KeyError('foo'))
  203. self.assertTrue(message.ack.call_count)
  204. self.assertIn("Can't decode message body", crit.call_args[0][0])
  205. def _get_on_message(self, c):
  206. if c.qos is None:
  207. c.qos = Mock()
  208. c.task_consumer = Mock()
  209. c.event_dispatcher = mock_event_dispatcher()
  210. c.connection = Mock(name='.connection')
  211. c.connection.drain_events.side_effect = WorkerShutdown()
  212. with self.assertRaises(WorkerShutdown):
  213. c.loop(*c.loop_args())
  214. self.assertTrue(c.task_consumer.on_message)
  215. return c.task_consumer.on_message
  216. def test_receieve_message(self):
  217. c = self.LoopConsumer()
  218. c.blueprint.state = RUN
  219. m = create_task_message(
  220. Mock(), self.foo_task.name,
  221. args=[2, 4, 8], kwargs={},
  222. )
  223. c.update_strategies()
  224. callback = self._get_on_message(c)
  225. callback(m)
  226. in_bucket = self.buffer.get_nowait()
  227. self.assertIsInstance(in_bucket, Request)
  228. self.assertEqual(in_bucket.name, self.foo_task.name)
  229. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  230. self.assertTrue(self.timer.empty())
  231. def test_start_channel_error(self):
  232. c = self.NoopConsumer(send_events=False, pool=BasePool())
  233. c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
  234. c.channel_errors = (KeyError,)
  235. try:
  236. with self.assertRaises(KeyError):
  237. c.start()
  238. finally:
  239. c.timer and c.timer.stop()
  240. def test_start_connection_error(self):
  241. c = self.NoopConsumer(send_events=False, pool=BasePool())
  242. c.loop.on_nth_call_do_raise(KeyError('foo'), SyntaxError('bar'))
  243. c.connection_errors = (KeyError,)
  244. try:
  245. with self.assertRaises(SyntaxError):
  246. c.start()
  247. finally:
  248. c.timer and c.timer.stop()
  249. def test_loop_ignores_socket_timeout(self):
  250. class Connection(self.app.connection_for_read().__class__):
  251. obj = None
  252. def drain_events(self, **kwargs):
  253. self.obj.connection = None
  254. raise socket.timeout(10)
  255. c = self.NoopConsumer()
  256. c.connection = Connection()
  257. c.connection.obj = c
  258. c.qos = QoS(c.task_consumer.qos, 10)
  259. c.loop(*c.loop_args())
  260. def test_loop_when_socket_error(self):
  261. class Connection(self.app.connection_for_read().__class__):
  262. obj = None
  263. def drain_events(self, **kwargs):
  264. self.obj.connection = None
  265. raise socket.error('foo')
  266. c = self.LoopConsumer()
  267. c.blueprint.state = RUN
  268. conn = c.connection = Connection()
  269. c.connection.obj = c
  270. c.qos = QoS(c.task_consumer.qos, 10)
  271. with self.assertRaises(socket.error):
  272. c.loop(*c.loop_args())
  273. c.blueprint.state = CLOSE
  274. c.connection = conn
  275. c.loop(*c.loop_args())
  276. def test_loop(self):
  277. class Connection(self.app.connection_for_read().__class__):
  278. obj = None
  279. def drain_events(self, **kwargs):
  280. self.obj.connection = None
  281. c = self.LoopConsumer()
  282. c.blueprint.state = RUN
  283. c.connection = Connection()
  284. c.connection.obj = c
  285. c.qos = QoS(c.task_consumer.qos, 10)
  286. c.loop(*c.loop_args())
  287. c.loop(*c.loop_args())
  288. self.assertTrue(c.task_consumer.consume.call_count)
  289. c.task_consumer.qos.assert_called_with(prefetch_count=10)
  290. self.assertEqual(c.qos.value, 10)
  291. c.qos.decrement_eventually()
  292. self.assertEqual(c.qos.value, 9)
  293. c.qos.update()
  294. self.assertEqual(c.qos.value, 9)
  295. c.task_consumer.qos.assert_called_with(prefetch_count=9)
  296. def test_ignore_errors(self):
  297. c = self.NoopConsumer()
  298. c.connection_errors = (AttributeError, KeyError,)
  299. c.channel_errors = (SyntaxError,)
  300. ignore_errors(c, Mock(side_effect=AttributeError('foo')))
  301. ignore_errors(c, Mock(side_effect=KeyError('foo')))
  302. ignore_errors(c, Mock(side_effect=SyntaxError('foo')))
  303. with self.assertRaises(IndexError):
  304. ignore_errors(c, Mock(side_effect=IndexError('foo')))
  305. def test_apply_eta_task(self):
  306. c = self.NoopConsumer()
  307. c.qos = QoS(None, 10)
  308. task = Mock(name='task', id='1234213')
  309. qos = c.qos.value
  310. c.apply_eta_task(task)
  311. self.assertIn(task, state.reserved_requests)
  312. self.assertEqual(c.qos.value, qos - 1)
  313. self.assertIs(self.buffer.get_nowait(), task)
  314. def test_receieve_message_eta_isoformat(self):
  315. c = self.LoopConsumer()
  316. c.blueprint.state = RUN
  317. c.steps.pop()
  318. m = create_task_message(
  319. Mock(), self.foo_task.name,
  320. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  321. args=[2, 4, 8], kwargs={},
  322. )
  323. c.qos = QoS(c.task_consumer.qos, 1)
  324. current_pcount = c.qos.value
  325. c.event_dispatcher.enabled = False
  326. c.update_strategies()
  327. callback = self._get_on_message(c)
  328. callback(m)
  329. c.timer.stop()
  330. c.timer.join(1)
  331. items = [entry[2] for entry in self.timer.queue]
  332. found = 0
  333. for item in items:
  334. if item.args[0].name == self.foo_task.name:
  335. found = True
  336. self.assertTrue(found)
  337. self.assertGreater(c.qos.value, current_pcount)
  338. c.timer.stop()
  339. def test_pidbox_callback(self):
  340. c = self.NoopConsumer()
  341. con = find_step(c, consumer.Control).box
  342. con.node = Mock()
  343. con.reset = Mock()
  344. con.on_message('foo', 'bar')
  345. con.node.handle_message.assert_called_with('foo', 'bar')
  346. con.node = Mock()
  347. con.node.handle_message.side_effect = KeyError('foo')
  348. con.on_message('foo', 'bar')
  349. con.node.handle_message.assert_called_with('foo', 'bar')
  350. con.node = Mock()
  351. con.node.handle_message.side_effect = ValueError('foo')
  352. con.on_message('foo', 'bar')
  353. con.node.handle_message.assert_called_with('foo', 'bar')
  354. con.reset.assert_called()
  355. def test_revoke(self):
  356. c = self.LoopConsumer()
  357. c.blueprint.state = RUN
  358. c.steps.pop()
  359. channel = Mock(name='channel')
  360. id = uuid()
  361. t = create_task_message(
  362. channel, self.foo_task.name,
  363. args=[2, 4, 8], kwargs={}, id=id,
  364. )
  365. state.revoked.add(id)
  366. callback = self._get_on_message(c)
  367. callback(t)
  368. self.assertTrue(self.buffer.empty())
  369. def test_receieve_message_not_registered(self):
  370. c = self.LoopConsumer()
  371. c.blueprint.state = RUN
  372. c.steps.pop()
  373. channel = Mock(name='channel')
  374. m = create_task_message(
  375. channel, 'x.X.31x', args=[2, 4, 8], kwargs={},
  376. )
  377. callback = self._get_on_message(c)
  378. self.assertFalse(callback(m))
  379. with self.assertRaises(Empty):
  380. self.buffer.get_nowait()
  381. self.assertTrue(self.timer.empty())
  382. @patch('celery.worker.consumer.consumer.warn')
  383. @patch('celery.worker.consumer.consumer.logger')
  384. def test_receieve_message_ack_raises(self, logger, warn):
  385. c = self.LoopConsumer()
  386. c.blueprint.state = RUN
  387. channel = Mock(name='channel')
  388. m = create_task_message(
  389. channel, self.foo_task.name,
  390. args=[2, 4, 8], kwargs={},
  391. )
  392. m.headers = None
  393. c.update_strategies()
  394. c.connection_errors = (socket.error,)
  395. m.reject = Mock()
  396. m.reject.side_effect = socket.error('foo')
  397. callback = self._get_on_message(c)
  398. self.assertFalse(callback(m))
  399. self.assertTrue(warn.call_count)
  400. with self.assertRaises(Empty):
  401. self.buffer.get_nowait()
  402. self.assertTrue(self.timer.empty())
  403. m.reject_log_error.assert_called_with(logger, c.connection_errors)
  404. def test_receive_message_eta(self):
  405. if os.environ.get('C_DEBUG_TEST'):
  406. pp = partial(print, file=sys.__stderr__)
  407. else:
  408. def pp(*args, **kwargs):
  409. pass
  410. pp('TEST RECEIVE MESSAGE ETA')
  411. pp('+CREATE MYKOMBUCONSUMER')
  412. c = self.LoopConsumer()
  413. pp('-CREATE MYKOMBUCONSUMER')
  414. c.steps.pop()
  415. channel = Mock(name='channel')
  416. pp('+ CREATE MESSAGE')
  417. m = create_task_message(
  418. channel, self.foo_task.name,
  419. args=[2, 4, 8], kwargs={},
  420. eta=(datetime.now() + timedelta(days=1)).isoformat(),
  421. )
  422. pp('- CREATE MESSAGE')
  423. try:
  424. pp('+ BLUEPRINT START 1')
  425. c.blueprint.start(c)
  426. pp('- BLUEPRINT START 1')
  427. p = c.app.conf.broker_connection_retry
  428. c.app.conf.broker_connection_retry = False
  429. pp('+ BLUEPRINT START 2')
  430. c.blueprint.start(c)
  431. pp('- BLUEPRINT START 2')
  432. c.app.conf.broker_connection_retry = p
  433. pp('+ BLUEPRINT RESTART')
  434. c.blueprint.restart(c)
  435. pp('- BLUEPRINT RESTART')
  436. pp('+ GET ON MESSAGE')
  437. callback = self._get_on_message(c)
  438. pp('- GET ON MESSAGE')
  439. pp('+ CALLBACK')
  440. callback(m)
  441. pp('- CALLBACK')
  442. finally:
  443. pp('+ STOP TIMER')
  444. c.timer.stop()
  445. pp('- STOP TIMER')
  446. try:
  447. pp('+ JOIN TIMER')
  448. c.timer.join()
  449. pp('- JOIN TIMER')
  450. except RuntimeError:
  451. pass
  452. in_hold = c.timer.queue[0]
  453. self.assertEqual(len(in_hold), 3)
  454. eta, priority, entry = in_hold
  455. task = entry.args[0]
  456. self.assertIsInstance(task, Request)
  457. self.assertEqual(task.name, self.foo_task.name)
  458. self.assertEqual(task.execute(), 2 * 4 * 8)
  459. with self.assertRaises(Empty):
  460. self.buffer.get_nowait()
  461. def test_reset_pidbox_node(self):
  462. c = self.NoopConsumer()
  463. con = find_step(c, consumer.Control).box
  464. con.node = Mock()
  465. chan = con.node.channel = Mock()
  466. chan.close.side_effect = socket.error('foo')
  467. c.connection_errors = (socket.error,)
  468. con.reset()
  469. chan.close.assert_called_with()
  470. def test_reset_pidbox_node_green(self):
  471. c = self.NoopConsumer(pool=Mock(is_green=True))
  472. con = find_step(c, consumer.Control)
  473. self.assertIsInstance(con.box, gPidbox)
  474. con.start(c)
  475. c.pool.spawn_n.assert_called_with(con.box.loop, c)
  476. def test_green_pidbox_node(self):
  477. pool = Mock()
  478. pool.is_green = True
  479. c = self.NoopConsumer(pool=Mock(is_green=True))
  480. controller = find_step(c, consumer.Control)
  481. class BConsumer(Mock):
  482. def __enter__(self):
  483. self.consume()
  484. return self
  485. def __exit__(self, *exc_info):
  486. self.cancel()
  487. controller.box.node.listen = BConsumer()
  488. connections = []
  489. class Connection(object):
  490. calls = 0
  491. def __init__(self, obj):
  492. connections.append(self)
  493. self.obj = obj
  494. self.default_channel = self.channel()
  495. self.closed = False
  496. def __enter__(self):
  497. return self
  498. def __exit__(self, *exc_info):
  499. self.close()
  500. def channel(self):
  501. return Mock()
  502. def as_uri(self):
  503. return 'dummy://'
  504. def drain_events(self, **kwargs):
  505. if not self.calls:
  506. self.calls += 1
  507. raise socket.timeout()
  508. self.obj.connection = None
  509. controller.box._node_shutdown.set()
  510. def close(self):
  511. self.closed = True
  512. c.connect = lambda: Connection(obj=c)
  513. controller = find_step(c, consumer.Control)
  514. controller.box.loop(c)
  515. controller.box.node.listen.assert_called()
  516. self.assertTrue(controller.box.consumer)
  517. controller.box.consumer.consume.assert_called_with()
  518. self.assertIsNone(c.connection)
  519. self.assertTrue(connections[0].closed)
  520. @patch('kombu.connection.Connection._establish_connection')
  521. @patch('kombu.utils.sleep')
  522. def test_connect_errback(self, sleep, connect):
  523. c = self.NoopConsumer()
  524. Transport.connection_errors = (ChannelError,)
  525. connect.on_nth_call_do(ChannelError('error'), n=1)
  526. c.connect()
  527. connect.assert_called_with()
  528. def test_stop_pidbox_node(self):
  529. c = self.NoopConsumer()
  530. cont = find_step(c, consumer.Control)
  531. cont._node_stopped = Event()
  532. cont._node_shutdown = Event()
  533. cont._node_stopped.set()
  534. cont.stop(c)
  535. def test_start__loop(self):
  536. class _QoS(object):
  537. prev = 3
  538. value = 4
  539. def update(self):
  540. self.prev = self.value
  541. init_callback = Mock(name='init_callback')
  542. c = self.NoopConsumer(init_callback=init_callback)
  543. c.qos = _QoS()
  544. c.connection = Connection()
  545. c.iterations = 0
  546. def raises_KeyError(*args, **kwargs):
  547. c.iterations += 1
  548. if c.qos.prev != c.qos.value:
  549. c.qos.update()
  550. if c.iterations >= 2:
  551. raise KeyError('foo')
  552. c.loop = raises_KeyError
  553. with self.assertRaises(KeyError):
  554. c.start()
  555. self.assertEqual(c.iterations, 2)
  556. self.assertEqual(c.qos.prev, c.qos.value)
  557. init_callback.reset_mock()
  558. c = self.NoopConsumer(send_events=False, init_callback=init_callback)
  559. c.qos = _QoS()
  560. c.connection = Connection()
  561. c.loop = Mock(side_effect=socket.error('foo'))
  562. with self.assertRaises(socket.error):
  563. c.start()
  564. c.loop.assert_called()
  565. def test_reset_connection_with_no_node(self):
  566. c = self.NoopConsumer()
  567. c.steps.pop()
  568. c.blueprint.start(c)
  569. class test_WorkController(AppCase):
  570. def setup(self):
  571. self.worker = self.create_worker()
  572. self._logger = worker_module.logger
  573. self._comp_logger = components.logger
  574. self.logger = worker_module.logger = Mock()
  575. self.comp_logger = components.logger = Mock()
  576. @self.app.task(shared=False)
  577. def foo_task(x, y, z):
  578. return x * y * z
  579. self.foo_task = foo_task
  580. def teardown(self):
  581. worker_module.logger = self._logger
  582. components.logger = self._comp_logger
  583. def create_worker(self, **kw):
  584. worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
  585. worker.blueprint.shutdown_complete.set()
  586. return worker
  587. def test_on_consumer_ready(self):
  588. self.worker.on_consumer_ready(Mock())
  589. def test_setup_queues_worker_direct(self):
  590. self.app.conf.worker_direct = True
  591. self.app.amqp.__dict__['queues'] = Mock()
  592. self.worker.setup_queues({})
  593. self.app.amqp.queues.select_add.assert_called_with(
  594. worker_direct(self.worker.hostname),
  595. )
  596. def test_setup_queues__missing_queue(self):
  597. self.app.amqp.queues.select = Mock(name='select')
  598. self.app.amqp.queues.deselect = Mock(name='deselect')
  599. self.app.amqp.queues.select.side_effect = KeyError()
  600. self.app.amqp.queues.deselect.side_effect = KeyError()
  601. with self.assertRaises(ImproperlyConfigured):
  602. self.worker.setup_queues('x,y', exclude='foo,bar')
  603. self.app.amqp.queues.select = Mock(name='select')
  604. with self.assertRaises(ImproperlyConfigured):
  605. self.worker.setup_queues('x,y', exclude='foo,bar')
  606. def test_send_worker_shutdown(self):
  607. with patch('celery.signals.worker_shutdown') as ws:
  608. self.worker._send_worker_shutdown()
  609. ws.send.assert_called_with(sender=self.worker)
  610. @skip.todo('unstable test')
  611. def test_process_shutdown_on_worker_shutdown(self):
  612. from celery.concurrency.prefork import process_destructor
  613. from celery.concurrency.asynpool import Worker
  614. with patch('celery.signals.worker_process_shutdown') as ws:
  615. with patch('os._exit') as _exit:
  616. worker = Worker(None, None, on_exit=process_destructor)
  617. worker._do_exit(22, 3.1415926)
  618. ws.send.assert_called_with(
  619. sender=None, pid=22, exitcode=3.1415926,
  620. )
  621. _exit.assert_called_with(3.1415926)
  622. def test_process_task_revoked_release_semaphore(self):
  623. self.worker._quick_release = Mock()
  624. req = Mock()
  625. req.execute_using_pool.side_effect = TaskRevokedError
  626. self.worker._process_task(req)
  627. self.worker._quick_release.assert_called_with()
  628. delattr(self.worker, '_quick_release')
  629. self.worker._process_task(req)
  630. def test_shutdown_no_blueprint(self):
  631. self.worker.blueprint = None
  632. self.worker._shutdown()
  633. @patch('celery.worker.create_pidlock')
  634. def test_use_pidfile(self, create_pidlock):
  635. create_pidlock.return_value = Mock()
  636. worker = self.create_worker(pidfile='pidfilelockfilepid')
  637. worker.steps = []
  638. worker.start()
  639. create_pidlock.assert_called()
  640. worker.stop()
  641. worker.pidlock.release.assert_called()
  642. def test_attrs(self):
  643. worker = self.worker
  644. self.assertIsNotNone(worker.timer)
  645. self.assertIsInstance(worker.timer, Timer)
  646. self.assertIsNotNone(worker.pool)
  647. self.assertIsNotNone(worker.consumer)
  648. self.assertTrue(worker.steps)
  649. def test_with_embedded_beat(self):
  650. worker = self.app.WorkController(concurrency=1, loglevel=0, beat=True)
  651. self.assertTrue(worker.beat)
  652. self.assertIn(worker.beat, [w.obj for w in worker.steps])
  653. def test_dont_stop_or_terminate(self):
  654. worker = self.app.WorkController(concurrency=1, loglevel=0)
  655. worker.stop()
  656. self.assertNotEqual(worker.blueprint.state, CLOSE)
  657. worker.terminate()
  658. self.assertNotEqual(worker.blueprint.state, CLOSE)
  659. sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
  660. try:
  661. worker.blueprint.state = RUN
  662. worker.stop(in_sighandler=True)
  663. self.assertNotEqual(worker.blueprint.state, CLOSE)
  664. worker.terminate(in_sighandler=True)
  665. self.assertNotEqual(worker.blueprint.state, CLOSE)
  666. finally:
  667. worker.pool.signal_safe = sigsafe
  668. def test_on_timer_error(self):
  669. worker = self.app.WorkController(concurrency=1, loglevel=0)
  670. try:
  671. raise KeyError('foo')
  672. except KeyError as exc:
  673. components.Timer(worker).on_timer_error(exc)
  674. msg, args = self.comp_logger.error.call_args[0]
  675. self.assertIn('KeyError', msg % args)
  676. def test_on_timer_tick(self):
  677. worker = self.app.WorkController(concurrency=1, loglevel=10)
  678. components.Timer(worker).on_timer_tick(30.0)
  679. xargs = self.comp_logger.debug.call_args[0]
  680. fmt, arg = xargs[0], xargs[1]
  681. self.assertEqual(30.0, arg)
  682. self.assertIn('Next eta %s secs', fmt)
  683. def test_process_task(self):
  684. worker = self.worker
  685. worker.pool = Mock()
  686. channel = Mock()
  687. m = create_task_message(
  688. channel, self.foo_task.name,
  689. args=[4, 8, 10], kwargs={},
  690. )
  691. task = Request(m, app=self.app)
  692. worker._process_task(task)
  693. self.assertEqual(worker.pool.apply_async.call_count, 1)
  694. worker.pool.stop()
  695. def test_process_task_raise_base(self):
  696. worker = self.worker
  697. worker.pool = Mock()
  698. worker.pool.apply_async.side_effect = KeyboardInterrupt('Ctrl+C')
  699. channel = Mock()
  700. m = create_task_message(
  701. channel, self.foo_task.name,
  702. args=[4, 8, 10], kwargs={},
  703. )
  704. task = Request(m, app=self.app)
  705. worker.steps = []
  706. worker.blueprint.state = RUN
  707. with self.assertRaises(KeyboardInterrupt):
  708. worker._process_task(task)
  709. def test_process_task_raise_WorkerTerminate(self):
  710. worker = self.worker
  711. worker.pool = Mock()
  712. worker.pool.apply_async.side_effect = WorkerTerminate()
  713. channel = Mock()
  714. m = create_task_message(
  715. channel, self.foo_task.name,
  716. args=[4, 8, 10], kwargs={},
  717. )
  718. task = Request(m, app=self.app)
  719. worker.steps = []
  720. worker.blueprint.state = RUN
  721. with self.assertRaises(SystemExit):
  722. worker._process_task(task)
  723. def test_process_task_raise_regular(self):
  724. worker = self.worker
  725. worker.pool = Mock()
  726. worker.pool.apply_async.side_effect = KeyError('some exception')
  727. channel = Mock()
  728. m = create_task_message(
  729. channel, self.foo_task.name,
  730. args=[4, 8, 10], kwargs={},
  731. )
  732. task = Request(m, app=self.app)
  733. worker._process_task(task)
  734. worker.pool.stop()
  735. def test_start_catches_base_exceptions(self):
  736. worker1 = self.create_worker()
  737. worker1.blueprint.state = RUN
  738. stc = MockStep()
  739. stc.start.side_effect = WorkerTerminate()
  740. worker1.steps = [stc]
  741. worker1.start()
  742. stc.start.assert_called_with(worker1)
  743. self.assertTrue(stc.terminate.call_count)
  744. worker2 = self.create_worker()
  745. worker2.blueprint.state = RUN
  746. sec = MockStep()
  747. sec.start.side_effect = WorkerShutdown()
  748. sec.terminate = None
  749. worker2.steps = [sec]
  750. worker2.start()
  751. self.assertTrue(sec.stop.call_count)
  752. def test_state_db(self):
  753. from celery.worker import state
  754. Persistent = state.Persistent
  755. state.Persistent = Mock()
  756. try:
  757. worker = self.create_worker(state_db='statefilename')
  758. self.assertTrue(worker._persistence)
  759. finally:
  760. state.Persistent = Persistent
  761. def test_process_task_sem(self):
  762. worker = self.worker
  763. worker._quick_acquire = Mock()
  764. req = Mock()
  765. worker._process_task_sem(req)
  766. worker._quick_acquire.assert_called_with(worker._process_task, req)
  767. def test_signal_consumer_close(self):
  768. worker = self.worker
  769. worker.consumer = Mock()
  770. worker.signal_consumer_close()
  771. worker.consumer.close.assert_called_with()
  772. worker.consumer.close.side_effect = AttributeError()
  773. worker.signal_consumer_close()
  774. def test_rusage__no_resource(self):
  775. from celery import worker
  776. prev, worker.resource = worker.resource, None
  777. try:
  778. self.worker.pool = Mock(name='pool')
  779. with self.assertRaises(NotImplementedError):
  780. self.worker.rusage()
  781. self.worker.stats()
  782. finally:
  783. worker.resource = prev
  784. def test_repr(self):
  785. self.assertTrue(repr(self.worker))
  786. def test_str(self):
  787. self.assertEqual(str(self.worker), self.worker.hostname)
  788. def test_start__stop(self):
  789. worker = self.worker
  790. worker.blueprint.shutdown_complete.set()
  791. worker.steps = [MockStep(StartStopStep(self)) for _ in range(4)]
  792. worker.blueprint.state = RUN
  793. worker.blueprint.started = 4
  794. for w in worker.steps:
  795. w.start = Mock()
  796. w.close = Mock()
  797. w.stop = Mock()
  798. worker.start()
  799. for w in worker.steps:
  800. self.assertTrue(w.start.call_count)
  801. worker.consumer = Mock()
  802. worker.stop(exitcode=3)
  803. for stopstep in worker.steps:
  804. self.assertTrue(stopstep.close.call_count)
  805. self.assertTrue(stopstep.stop.call_count)
  806. # Doesn't close pool if no pool.
  807. worker.start()
  808. worker.pool = None
  809. worker.stop()
  810. # test that stop of None is not attempted
  811. worker.steps[-1] = None
  812. worker.start()
  813. worker.stop()
  814. def test_start__KeyboardInterrupt(self):
  815. worker = self.worker
  816. worker.blueprint = Mock(name='blueprint')
  817. worker.blueprint.start.side_effect = KeyboardInterrupt()
  818. worker.stop = Mock(name='stop')
  819. worker.start()
  820. worker.stop.assert_called_with(exitcode=EX_FAILURE)
  821. def test_register_with_event_loop(self):
  822. worker = self.worker
  823. hub = Mock(name='hub')
  824. worker.blueprint = Mock(name='blueprint')
  825. worker.register_with_event_loop(hub)
  826. worker.blueprint.send_all.assert_called_with(
  827. worker, 'register_with_event_loop', args=(hub,),
  828. description='hub.register',
  829. )
  830. def test_step_raises(self):
  831. worker = self.worker
  832. step = Mock()
  833. worker.steps = [step]
  834. step.start.side_effect = TypeError()
  835. worker.stop = Mock()
  836. worker.start()
  837. worker.stop.assert_called_with(exitcode=EX_FAILURE)
  838. def test_state(self):
  839. self.assertTrue(self.worker.state)
  840. def test_start__terminate(self):
  841. worker = self.worker
  842. worker.blueprint.shutdown_complete.set()
  843. worker.blueprint.started = 5
  844. worker.blueprint.state = RUN
  845. worker.steps = [MockStep() for _ in range(5)]
  846. worker.start()
  847. for w in worker.steps[:3]:
  848. self.assertTrue(w.start.call_count)
  849. self.assertTrue(worker.blueprint.started, len(worker.steps))
  850. self.assertEqual(worker.blueprint.state, RUN)
  851. worker.terminate()
  852. for step in worker.steps:
  853. self.assertTrue(step.terminate.call_count)
  854. worker.blueprint.state = TERMINATE
  855. worker.terminate()
  856. def test_Hub_crate(self):
  857. w = Mock()
  858. x = components.Hub(w)
  859. x.create(w)
  860. self.assertTrue(w.timer.max_interval)
  861. def test_Pool_crate_threaded(self):
  862. w = Mock()
  863. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  864. w.pool_cls = Mock()
  865. w.use_eventloop = False
  866. pool = components.Pool(w)
  867. pool.create(w)
  868. def test_Pool_pool_no_sem(self):
  869. w = Mock()
  870. w.pool_cls.uses_semaphore = False
  871. components.Pool(w).create(w)
  872. self.assertIs(w.process_task, w._process_task)
  873. def test_Pool_create(self):
  874. from kombu.async.semaphore import LaxBoundedSemaphore
  875. w = Mock()
  876. w._conninfo.connection_errors = w._conninfo.channel_errors = ()
  877. w.hub = Mock()
  878. PoolImp = Mock()
  879. poolimp = PoolImp.return_value = Mock()
  880. poolimp._pool = [Mock(), Mock()]
  881. poolimp._cache = {}
  882. poolimp._fileno_to_inq = {}
  883. poolimp._fileno_to_outq = {}
  884. from celery.concurrency.prefork import TaskPool as _TaskPool
  885. class MockTaskPool(_TaskPool):
  886. Pool = PoolImp
  887. @property
  888. def timers(self):
  889. return {Mock(): 30}
  890. w.pool_cls = MockTaskPool
  891. w.use_eventloop = True
  892. w.consumer.restart_count = -1
  893. pool = components.Pool(w)
  894. pool.create(w)
  895. pool.register_with_event_loop(w, w.hub)
  896. if sys.platform != 'win32':
  897. self.assertIsInstance(w.semaphore, LaxBoundedSemaphore)
  898. P = w.pool
  899. P.start()