test_worker.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. from __future__ import with_statement
  2. import socket
  3. import sys
  4. from collections import deque
  5. from datetime import datetime, timedelta
  6. from Queue import Empty
  7. from kombu.transport.base import Message
  8. from kombu.connection import BrokerConnection
  9. from mock import Mock, patch
  10. from celery import current_app
  11. from celery.concurrency.base import BasePool
  12. from celery.exceptions import SystemTerminate
  13. from celery.task import task as task_dec
  14. from celery.task import periodic_task as periodic_task_dec
  15. from celery.utils import gen_unique_id
  16. from celery.worker import WorkController
  17. from celery.worker.buckets import FastQueue
  18. from celery.worker.job import TaskRequest
  19. from celery.worker.consumer import Consumer as MainConsumer
  20. from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX
  21. from celery.utils.serialization import pickle
  22. from celery.utils.timer2 import Timer
  23. from celery.tests.compat import catch_warnings
  24. from celery.tests.utils import unittest
  25. from celery.tests.utils import AppCase, skip
  26. class PlaceHolder(object):
  27. pass
  28. class MyKombuConsumer(MainConsumer):
  29. broadcast_consumer = Mock()
  30. task_consumer = Mock()
  31. def __init__(self, *args, **kwargs):
  32. kwargs.setdefault("pool", BasePool(2))
  33. super(MyKombuConsumer, self).__init__(*args, **kwargs)
  34. def restart_heartbeat(self):
  35. self.heart = None
  36. class MockNode(object):
  37. commands = []
  38. def handle_message(self, body, message):
  39. self.commands.append(body.pop("command", None))
  40. class MockEventDispatcher(object):
  41. sent = []
  42. closed = False
  43. flushed = False
  44. _outbound_buffer = []
  45. def send(self, event, *args, **kwargs):
  46. self.sent.append(event)
  47. def close(self):
  48. self.closed = True
  49. def flush(self):
  50. self.flushed = True
  51. class MockHeart(object):
  52. closed = False
  53. def stop(self):
  54. self.closed = True
  55. @task_dec()
  56. def foo_task(x, y, z, **kwargs):
  57. return x * y * z
  58. @periodic_task_dec(run_every=60)
  59. def foo_periodic_task():
  60. return "foo"
  61. def create_message(backend, **data):
  62. data.setdefault("id", gen_unique_id())
  63. return Message(backend, body=pickle.dumps(dict(**data)),
  64. content_type="application/x-python-serialize",
  65. content_encoding="binary")
  66. class test_QoS(unittest.TestCase):
  67. class _QoS(QoS):
  68. def __init__(self, value):
  69. self.value = value
  70. QoS.__init__(self, None, value, None)
  71. def set(self, value):
  72. return value
  73. def test_qos_increment_decrement(self):
  74. qos = self._QoS(10)
  75. self.assertEqual(qos.increment(), 11)
  76. self.assertEqual(qos.increment(3), 14)
  77. self.assertEqual(qos.increment(-30), 14)
  78. self.assertEqual(qos.decrement(7), 7)
  79. self.assertEqual(qos.decrement(), 6)
  80. self.assertRaises(AssertionError, qos.decrement, 10)
  81. def test_qos_disabled_increment_decrement(self):
  82. qos = self._QoS(0)
  83. self.assertEqual(qos.increment(), 0)
  84. self.assertEqual(qos.increment(3), 0)
  85. self.assertEqual(qos.increment(-30), 0)
  86. self.assertEqual(qos.decrement(7), 0)
  87. self.assertEqual(qos.decrement(), 0)
  88. self.assertEqual(qos.decrement(10), 0)
  89. def test_qos_thread_safe(self):
  90. qos = self._QoS(10)
  91. def add():
  92. for i in xrange(1000):
  93. qos.increment()
  94. def sub():
  95. for i in xrange(1000):
  96. qos.decrement_eventually()
  97. def threaded(funs):
  98. from threading import Thread
  99. threads = [Thread(target=fun) for fun in funs]
  100. for thread in threads:
  101. thread.start()
  102. for thread in threads:
  103. thread.join()
  104. threaded([add, add])
  105. self.assertEqual(qos.value, 2010)
  106. qos.value = 1000
  107. threaded([add, sub]) # n = 2
  108. self.assertEqual(qos.value, 1000)
  109. def test_exceeds_short(self):
  110. qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1,
  111. current_app.log.get_default_logger())
  112. qos.update()
  113. self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
  114. qos.increment()
  115. self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
  116. qos.increment()
  117. self.assertEqual(qos.value, PREFETCH_COUNT_MAX + 1)
  118. qos.decrement()
  119. self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
  120. qos.decrement()
  121. self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
  122. def test_consumer_increment_decrement(self):
  123. consumer = Mock()
  124. qos = QoS(consumer, 10, current_app.log.get_default_logger())
  125. qos.update()
  126. self.assertEqual(qos.value, 10)
  127. self.assertIn({"prefetch_count": 10}, consumer.qos.call_args)
  128. qos.decrement()
  129. self.assertEqual(qos.value, 9)
  130. self.assertIn({"prefetch_count": 9}, consumer.qos.call_args)
  131. qos.decrement_eventually()
  132. self.assertEqual(qos.value, 8)
  133. self.assertIn({"prefetch_count": 9}, consumer.qos.call_args)
  134. # Does not decrement 0 value
  135. qos.value = 0
  136. qos.decrement()
  137. self.assertEqual(qos.value, 0)
  138. qos.increment()
  139. self.assertEqual(qos.value, 0)
  140. class test_Consumer(unittest.TestCase):
  141. def setUp(self):
  142. self.ready_queue = FastQueue()
  143. self.eta_schedule = Timer()
  144. self.logger = current_app.log.get_default_logger()
  145. self.logger.setLevel(0)
  146. def tearDown(self):
  147. self.eta_schedule.stop()
  148. def test_info(self):
  149. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  150. send_events=False)
  151. l.qos = QoS(l.task_consumer, 10, l.logger)
  152. info = l.info
  153. self.assertEqual(info["prefetch_count"], 10)
  154. self.assertFalse(info["broker"])
  155. l.connection = current_app.broker_connection()
  156. info = l.info
  157. self.assertTrue(info["broker"])
  158. def test_connection(self):
  159. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  160. send_events=False)
  161. l.reset_connection()
  162. self.assertIsInstance(l.connection, BrokerConnection)
  163. l._state = RUN
  164. l.event_dispatcher = None
  165. l.stop_consumers(close_connection=False)
  166. self.assertTrue(l.connection)
  167. l._state = RUN
  168. l.stop_consumers()
  169. self.assertIsNone(l.connection)
  170. self.assertIsNone(l.task_consumer)
  171. l.reset_connection()
  172. self.assertIsInstance(l.connection, BrokerConnection)
  173. l.stop_consumers()
  174. l.stop()
  175. l.close_connection()
  176. self.assertIsNone(l.connection)
  177. self.assertIsNone(l.task_consumer)
  178. def test_close_connection(self):
  179. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  180. send_events=False)
  181. l._state = RUN
  182. l.close_connection()
  183. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  184. send_events=False)
  185. eventer = l.event_dispatcher = Mock()
  186. eventer.enabled = True
  187. heart = l.heart = MockHeart()
  188. l._state = RUN
  189. l.stop_consumers()
  190. self.assertTrue(eventer.close.call_count)
  191. self.assertTrue(heart.closed)
  192. def test_receive_message_unknown(self):
  193. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  194. send_events=False)
  195. backend = Mock()
  196. m = create_message(backend, unknown={"baz": "!!!"})
  197. l.event_dispatcher = Mock()
  198. l.pidbox_node = MockNode()
  199. with catch_warnings(record=True) as log:
  200. l.receive_message(m.decode(), m)
  201. self.assertTrue(log)
  202. self.assertIn("unknown message", log[0].message.args[0])
  203. @patch("celery.utils.timer2.to_timestamp")
  204. def test_receive_message_eta_OverflowError(self, to_timestamp):
  205. to_timestamp.side_effect = OverflowError()
  206. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  207. send_events=False)
  208. m = create_message(Mock(), task=foo_task.name,
  209. args=("2, 2"),
  210. kwargs={},
  211. eta=datetime.now().isoformat())
  212. l.event_dispatcher = Mock()
  213. l.pidbox_node = MockNode()
  214. l.receive_message(m.decode(), m)
  215. self.assertTrue(m.acknowledged)
  216. self.assertTrue(to_timestamp.call_count)
  217. def test_receive_message_InvalidTaskError(self):
  218. logger = Mock()
  219. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  220. send_events=False)
  221. m = create_message(Mock(), task=foo_task.name,
  222. args=(1, 2), kwargs="foobarbaz", id=1)
  223. l.event_dispatcher = Mock()
  224. l.pidbox_node = MockNode()
  225. l.receive_message(m.decode(), m)
  226. self.assertIn("Received invalid task message",
  227. logger.error.call_args[0][0])
  228. def test_on_decode_error(self):
  229. logger = Mock()
  230. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  231. send_events=False)
  232. class MockMessage(Mock):
  233. content_type = "application/x-msgpack"
  234. content_encoding = "binary"
  235. body = "foobarbaz"
  236. message = MockMessage()
  237. l.on_decode_error(message, KeyError("foo"))
  238. self.assertTrue(message.ack.call_count)
  239. self.assertIn("Can't decode message body",
  240. logger.critical.call_args[0][0])
  241. def test_receieve_message(self):
  242. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  243. send_events=False)
  244. m = create_message(Mock(), task=foo_task.name,
  245. args=[2, 4, 8], kwargs={})
  246. l.event_dispatcher = Mock()
  247. l.receive_message(m.decode(), m)
  248. in_bucket = self.ready_queue.get_nowait()
  249. self.assertIsInstance(in_bucket, TaskRequest)
  250. self.assertEqual(in_bucket.task_name, foo_task.name)
  251. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  252. self.assertTrue(self.eta_schedule.empty())
  253. def test_start_connection_error(self):
  254. class MockConsumer(MainConsumer):
  255. iterations = 0
  256. def consume_messages(self):
  257. if not self.iterations:
  258. self.iterations = 1
  259. raise KeyError("foo")
  260. raise SyntaxError("bar")
  261. l = MockConsumer(self.ready_queue, self.eta_schedule, self.logger,
  262. send_events=False, pool=BasePool())
  263. l.connection_errors = (KeyError, )
  264. self.assertRaises(SyntaxError, l.start)
  265. l.heart.stop()
  266. l.priority_timer.stop()
  267. def test_consume_messages(self):
  268. class Connection(current_app.broker_connection().__class__):
  269. obj = None
  270. def drain_events(self, **kwargs):
  271. self.obj.connection = None
  272. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  273. send_events=False)
  274. l.connection = Connection()
  275. l.connection.obj = l
  276. l.task_consumer = Mock()
  277. l.qos = QoS(l.task_consumer, 10, l.logger)
  278. l.consume_messages()
  279. l.consume_messages()
  280. self.assertTrue(l.task_consumer.consume.call_count)
  281. l.task_consumer.qos.assert_called_with(prefetch_count=10)
  282. l.qos.decrement()
  283. l.consume_messages()
  284. l.task_consumer.qos.assert_called_with(prefetch_count=9)
  285. def test_maybe_conn_error(self):
  286. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  287. send_events=False)
  288. l.connection_errors = (KeyError, )
  289. l.channel_errors = (SyntaxError, )
  290. l.maybe_conn_error(Mock(side_effect=AttributeError("foo")))
  291. l.maybe_conn_error(Mock(side_effect=KeyError("foo")))
  292. l.maybe_conn_error(Mock(side_effect=SyntaxError("foo")))
  293. self.assertRaises(IndexError, l.maybe_conn_error,
  294. Mock(side_effect=IndexError("foo")))
  295. def test_apply_eta_task(self):
  296. from celery.worker import state
  297. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  298. send_events=False)
  299. l.qos = QoS(None, 10, l.logger)
  300. task = object()
  301. qos = l.qos.value
  302. l.apply_eta_task(task)
  303. self.assertIn(task, state.reserved_requests)
  304. self.assertEqual(l.qos.value, qos - 1)
  305. self.assertIs(self.ready_queue.get_nowait(), task)
  306. def test_receieve_message_eta_isoformat(self):
  307. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  308. send_events=False)
  309. m = create_message(Mock(), task=foo_task.name,
  310. eta=datetime.now().isoformat(),
  311. args=[2, 4, 8], kwargs={})
  312. l.task_consumer = Mock()
  313. l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
  314. l.event_dispatcher = Mock()
  315. l.receive_message(m.decode(), m)
  316. l.eta_schedule.stop()
  317. items = [entry[2] for entry in self.eta_schedule.queue]
  318. found = 0
  319. for item in items:
  320. if item.args[0].task_name == foo_task.name:
  321. found = True
  322. self.assertTrue(found)
  323. self.assertTrue(l.task_consumer.qos.call_count)
  324. l.eta_schedule.stop()
  325. def test_revoke(self):
  326. ready_queue = FastQueue()
  327. l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
  328. send_events=False)
  329. backend = Mock()
  330. id = gen_unique_id()
  331. t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
  332. kwargs={}, id=id)
  333. from celery.worker.state import revoked
  334. revoked.add(id)
  335. l.receive_message(t.decode(), t)
  336. self.assertTrue(ready_queue.empty())
  337. def test_receieve_message_not_registered(self):
  338. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  339. send_events=False)
  340. backend = Mock()
  341. m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
  342. l.event_dispatcher = Mock()
  343. self.assertFalse(l.receive_message(m.decode(), m))
  344. self.assertRaises(Empty, self.ready_queue.get_nowait)
  345. self.assertTrue(self.eta_schedule.empty())
  346. def test_receieve_message_eta(self):
  347. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  348. send_events=False)
  349. l.event_dispatcher = Mock()
  350. l.event_dispatcher._outbound_buffer = deque()
  351. backend = Mock()
  352. m = create_message(backend, task=foo_task.name,
  353. args=[2, 4, 8], kwargs={},
  354. eta=(datetime.now() +
  355. timedelta(days=1)).isoformat())
  356. l.reset_connection()
  357. p = l.app.conf.BROKER_CONNECTION_RETRY
  358. l.app.conf.BROKER_CONNECTION_RETRY = False
  359. try:
  360. l.reset_connection()
  361. finally:
  362. l.app.conf.BROKER_CONNECTION_RETRY = p
  363. l.stop_consumers()
  364. l.event_dispatcher = Mock()
  365. l.receive_message(m.decode(), m)
  366. l.eta_schedule.stop()
  367. in_hold = self.eta_schedule.queue[0]
  368. self.assertEqual(len(in_hold), 3)
  369. eta, priority, entry = in_hold
  370. task = entry.args[0]
  371. self.assertIsInstance(task, TaskRequest)
  372. self.assertEqual(task.task_name, foo_task.name)
  373. self.assertEqual(task.execute(), 2 * 4 * 8)
  374. self.assertRaises(Empty, self.ready_queue.get_nowait)
  375. def test_start__consume_messages(self):
  376. class _QoS(object):
  377. prev = 3
  378. value = 4
  379. def update(self):
  380. self.prev = self.value
  381. class _Consumer(MyKombuConsumer):
  382. iterations = 0
  383. def reset_connection(self):
  384. if self.iterations >= 1:
  385. raise KeyError("foo")
  386. init_callback = Mock()
  387. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  388. send_events=False, init_callback=init_callback)
  389. l.task_consumer = Mock()
  390. l.broadcast_consumer = Mock()
  391. l.qos = _QoS()
  392. l.connection = BrokerConnection()
  393. l.iterations = 0
  394. def raises_KeyError(limit=None):
  395. l.iterations += 1
  396. if l.qos.prev != l.qos.value:
  397. l.qos.update()
  398. if l.iterations >= 2:
  399. raise KeyError("foo")
  400. l.consume_messages = raises_KeyError
  401. self.assertRaises(KeyError, l.start)
  402. self.assertTrue(init_callback.call_count)
  403. self.assertEqual(l.iterations, 1)
  404. self.assertEqual(l.qos.prev, l.qos.value)
  405. init_callback.reset_mock()
  406. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  407. send_events=False, init_callback=init_callback)
  408. l.qos = _QoS()
  409. l.task_consumer = Mock()
  410. l.broadcast_consumer = Mock()
  411. l.connection = BrokerConnection()
  412. l.consume_messages = Mock(side_effect=socket.error("foo"))
  413. self.assertRaises(socket.error, l.start)
  414. self.assertTrue(init_callback.call_count)
  415. self.assertTrue(l.consume_messages.call_count)
  416. class test_WorkController(AppCase):
  417. def setup(self):
  418. self.worker = self.create_worker()
  419. def create_worker(self, **kw):
  420. worker = WorkController(concurrency=1, loglevel=0, **kw)
  421. worker.logger = Mock()
  422. return worker
  423. @patch("celery.platforms.signals")
  424. @patch("celery.platforms.set_mp_process_title")
  425. def test_process_initializer(self, set_mp_process_title, _signals):
  426. from celery import Celery
  427. from celery import signals
  428. from celery.app import _tls
  429. from celery.worker import process_initializer
  430. from celery.worker import WORKER_SIGRESET, WORKER_SIGIGNORE
  431. def on_worker_process_init(**kwargs):
  432. on_worker_process_init.called = True
  433. on_worker_process_init.called = False
  434. signals.worker_process_init.connect(on_worker_process_init)
  435. app = Celery(loader=Mock(), set_as_current=False)
  436. process_initializer(app, "awesome.worker.com")
  437. self.assertIn((tuple(WORKER_SIGIGNORE), {}),
  438. _signals.ignore.call_args_list)
  439. self.assertIn((tuple(WORKER_SIGRESET), {}),
  440. _signals.reset.call_args_list)
  441. self.assertTrue(app.loader.init_worker.call_count)
  442. self.assertTrue(on_worker_process_init.called)
  443. self.assertIs(_tls.current_app, app)
  444. set_mp_process_title.assert_called_with("celeryd",
  445. hostname="awesome.worker.com")
  446. def test_with_rate_limits_disabled(self):
  447. worker = WorkController(concurrency=1, loglevel=0,
  448. disable_rate_limits=True)
  449. self.assertTrue(hasattr(worker.ready_queue, "put"))
  450. def test_attrs(self):
  451. worker = self.worker
  452. self.assertIsInstance(worker.scheduler, Timer)
  453. self.assertTrue(worker.scheduler)
  454. self.assertTrue(worker.pool)
  455. self.assertTrue(worker.consumer)
  456. self.assertTrue(worker.mediator)
  457. self.assertTrue(worker.components)
  458. def test_with_embedded_celerybeat(self):
  459. worker = WorkController(concurrency=1, loglevel=0,
  460. embed_clockservice=True)
  461. self.assertTrue(worker.beat)
  462. self.assertIn(worker.beat, worker.components)
  463. def test_with_autoscaler(self):
  464. worker = self.create_worker(autoscale=[10, 3], send_events=False,
  465. eta_scheduler_cls="celery.utils.timer2.Timer")
  466. self.assertTrue(worker.autoscaler)
  467. def test_dont_stop_or_terminate(self):
  468. worker = WorkController(concurrency=1, loglevel=0)
  469. worker.stop()
  470. self.assertNotEqual(worker._state, worker.CLOSE)
  471. worker.terminate()
  472. self.assertNotEqual(worker._state, worker.CLOSE)
  473. sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
  474. try:
  475. worker._state = worker.RUN
  476. worker.stop(in_sighandler=True)
  477. self.assertNotEqual(worker._state, worker.CLOSE)
  478. worker.terminate(in_sighandler=True)
  479. self.assertNotEqual(worker._state, worker.CLOSE)
  480. finally:
  481. worker.pool.signal_safe = sigsafe
  482. def test_on_timer_error(self):
  483. worker = WorkController(concurrency=1, loglevel=0)
  484. worker.logger = Mock()
  485. try:
  486. raise KeyError("foo")
  487. except KeyError:
  488. exc_info = sys.exc_info()
  489. worker.on_timer_error(exc_info)
  490. logged = worker.logger.error.call_args[0][0]
  491. self.assertIn("KeyError", logged)
  492. def test_on_timer_tick(self):
  493. worker = WorkController(concurrency=1, loglevel=10)
  494. worker.logger = Mock()
  495. worker.timer_debug = worker.logger.debug
  496. worker.on_timer_tick(30.0)
  497. logged = worker.logger.debug.call_args[0][0]
  498. self.assertIn("30.0", logged)
  499. def test_process_task(self):
  500. worker = self.worker
  501. worker.pool = Mock()
  502. backend = Mock()
  503. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  504. kwargs={})
  505. task = TaskRequest.from_message(m, m.decode())
  506. worker.process_task(task)
  507. self.assertEqual(worker.pool.apply_async.call_count, 1)
  508. worker.pool.stop()
  509. def test_process_task_raise_base(self):
  510. worker = self.worker
  511. worker.pool = Mock()
  512. worker.pool.apply_async.side_effect = KeyboardInterrupt("Ctrl+C")
  513. backend = Mock()
  514. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  515. kwargs={})
  516. task = TaskRequest.from_message(m, m.decode())
  517. worker.components = []
  518. worker._state = worker.RUN
  519. self.assertRaises(KeyboardInterrupt, worker.process_task, task)
  520. self.assertEqual(worker._state, worker.TERMINATE)
  521. def test_process_task_raise_SystemTerminate(self):
  522. worker = self.worker
  523. worker.pool = Mock()
  524. worker.pool.apply_async.side_effect = SystemTerminate()
  525. backend = Mock()
  526. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  527. kwargs={})
  528. task = TaskRequest.from_message(m, m.decode())
  529. worker.components = []
  530. worker._state = worker.RUN
  531. self.assertRaises(SystemExit, worker.process_task, task)
  532. self.assertEqual(worker._state, worker.TERMINATE)
  533. def test_process_task_raise_regular(self):
  534. worker = self.worker
  535. worker.pool = Mock()
  536. worker.pool.apply_async.side_effect = KeyError("some exception")
  537. backend = Mock()
  538. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  539. kwargs={})
  540. task = TaskRequest.from_message(m, m.decode())
  541. worker.process_task(task)
  542. worker.pool.stop()
  543. def test_start_catches_base_exceptions(self):
  544. worker1 = self.create_worker()
  545. stc = Mock()
  546. stc.start.side_effect = SystemTerminate()
  547. worker1.components = [stc]
  548. self.assertRaises(SystemExit, worker1.start)
  549. self.assertTrue(stc.terminate.call_count)
  550. worker2 = self.create_worker()
  551. sec = Mock()
  552. sec.start.side_effect = SystemExit()
  553. sec.terminate = None
  554. worker2.components = [sec]
  555. self.assertRaises(SystemExit, worker2.start)
  556. self.assertTrue(sec.stop.call_count)
  557. def test_state_db(self):
  558. from celery.worker import state
  559. Persistent = state.Persistent
  560. state.Persistent = Mock()
  561. try:
  562. worker = self.create_worker(db="statefilename")
  563. self.assertTrue(worker._finalize_db)
  564. worker._finalize_db.cancel()
  565. finally:
  566. state.Persistent = Persistent
  567. @skip("Issue #264")
  568. def test_disable_rate_limits(self):
  569. from celery.worker.buckets import FastQueue
  570. worker = self.create_worker(disable_rate_limits=True)
  571. self.assertIsInstance(worker.ready_queue, FastQueue)
  572. self.assertIsNone(worker.mediator)
  573. self.assertEqual(worker.ready_queue.put, worker.process_task)
  574. def test_start__stop(self):
  575. worker = self.worker
  576. worker.components = [Mock(), Mock(), Mock(), Mock()]
  577. worker.start()
  578. for w in worker.components:
  579. self.assertTrue(w.start.call_count)
  580. worker.stop()
  581. for component in worker.components:
  582. self.assertTrue(w.stop.call_count)
  583. def test_start__terminate(self):
  584. worker = self.worker
  585. worker.components = [Mock(), Mock(), Mock(), Mock(), Mock()]
  586. for component in worker.components[:3]:
  587. component.terminate = None
  588. worker.start()
  589. for w in worker.components[:3]:
  590. self.assertTrue(w.start.call_count)
  591. self.assertTrue(worker._running, len(worker.components))
  592. self.assertEqual(worker._state, RUN)
  593. worker.terminate()
  594. for component in worker.components[:3]:
  595. self.assertTrue(component.stop.call_count)
  596. self.assertTrue(worker.components[4].terminate.call_count)