test_worker.py 29 KB


  1. import socket
  2. import sys
  3. from datetime import datetime, timedelta
  4. from Queue import Empty
  5. from kombu.transport.base import Message
  6. from kombu.connection import BrokerConnection
  7. from celery.utils.timer2 import Timer
  8. from celery.app import app_or_default
  9. from celery.concurrency.base import BasePool
  10. from celery.exceptions import SystemTerminate
  11. from celery.task import task as task_dec
  12. from celery.task import periodic_task as periodic_task_dec
  13. from celery.utils import gen_unique_id
  14. from celery.worker import WorkController
  15. from celery.worker.buckets import FastQueue
  16. from celery.worker.job import TaskRequest
  17. from celery.worker.consumer import Consumer as MainConsumer
  18. from celery.worker.consumer import QoS, RUN
  19. from celery.utils.serialization import pickle
  20. from celery.tests.compat import catch_warnings
  21. from celery.tests.utils import unittest
  22. from celery.tests.utils import AppCase, execute_context, skip
  23. class MockConsumer(object):
  24. class Channel(object):
  25. def close(self):
  26. pass
  27. def register_callback(self, cb):
  28. pass
  29. def consume(self):
  30. pass
  31. @property
  32. def channel(self):
  33. return self.Channel()
  34. class PlaceHolder(object):
  35. pass
  36. class MyKombuConsumer(MainConsumer):
  37. broadcast_consumer = MockConsumer()
  38. task_consumer = MockConsumer()
  39. def restart_heartbeat(self):
  40. self.heart = None
  41. class MockNode(object):
  42. commands = []
  43. def handle_message(self, body, message):
  44. self.commands.append(body.pop("command", None))
  45. class MockEventDispatcher(object):
  46. sent = []
  47. closed = False
  48. flushed = False
  49. _outbound_buffer = []
  50. def send(self, event, *args, **kwargs):
  51. self.sent.append(event)
  52. def close(self):
  53. self.closed = True
  54. def flush(self):
  55. self.flushed = True
  56. class MockHeart(object):
  57. closed = False
  58. def stop(self):
  59. self.closed = True
  60. @task_dec()
  61. def foo_task(x, y, z, **kwargs):
  62. return x * y * z
  63. @periodic_task_dec(run_every=60)
  64. def foo_periodic_task():
  65. return "foo"
  66. class MockLogger(object):
  67. def __init__(self):
  68. self.logged = []
  69. def critical(self, msg, *args, **kwargs):
  70. self.logged.append(msg)
  71. def info(self, msg, *args, **kwargs):
  72. self.logged.append(msg)
  73. def error(self, msg, *args, **kwargs):
  74. self.logged.append(msg)
  75. def debug(self, msg, *args, **kwargs):
  76. self.logged.append(msg)
  77. class MockBackend(object):
  78. _acked = False
  79. def basic_ack(self, delivery_tag):
  80. self._acked = True
  81. class MockPool(BasePool):
  82. _terminated = False
  83. _stopped = False
  84. def __init__(self, *args, **kwargs):
  85. self.raise_regular = kwargs.get("raise_regular", False)
  86. self.raise_base = kwargs.get("raise_base", False)
  87. self.raise_SystemTerminate = kwargs.get("raise_SystemTerminate",
  88. False)
  89. def apply_async(self, *args, **kwargs):
  90. if self.raise_regular:
  91. raise KeyError("some exception")
  92. if self.raise_base:
  93. raise KeyboardInterrupt("Ctrl+c")
  94. if self.raise_SystemTerminate:
  95. raise SystemTerminate()
  96. def start(self):
  97. pass
  98. def stop(self):
  99. self._stopped = True
  100. return True
  101. def terminate(self):
  102. self._terminated = True
  103. self.stop()
  104. class MockController(object):
  105. def __init__(self, w, *args, **kwargs):
  106. self._w = w
  107. self._stopped = False
  108. def start(self):
  109. self._w["started"] = True
  110. self._stopped = False
  111. def stop(self):
  112. self._stopped = True
  113. def create_message(backend, **data):
  114. data.setdefault("id", gen_unique_id())
  115. return Message(backend, body=pickle.dumps(dict(**data)),
  116. content_type="application/x-python-serialize",
  117. content_encoding="binary")
  118. class test_QoS(unittest.TestCase):
  119. class _QoS(QoS):
  120. def __init__(self, value):
  121. self.value = value
  122. QoS.__init__(self, None, value, None)
  123. def set(self, value):
  124. return value
  125. def test_qos_increment_decrement(self):
  126. qos = self._QoS(10)
  127. self.assertEqual(qos.increment(), 11)
  128. self.assertEqual(qos.increment(3), 14)
  129. self.assertEqual(qos.increment(-30), 14)
  130. self.assertEqual(qos.decrement(7), 7)
  131. self.assertEqual(qos.decrement(), 6)
  132. self.assertRaises(AssertionError, qos.decrement, 10)
  133. def test_qos_disabled_increment_decrement(self):
  134. qos = self._QoS(0)
  135. self.assertEqual(qos.increment(), 0)
  136. self.assertEqual(qos.increment(3), 0)
  137. self.assertEqual(qos.increment(-30), 0)
  138. self.assertEqual(qos.decrement(7), 0)
  139. self.assertEqual(qos.decrement(), 0)
  140. self.assertEqual(qos.decrement(10), 0)
  141. def test_qos_thread_safe(self):
  142. qos = self._QoS(10)
  143. def add():
  144. for i in xrange(1000):
  145. qos.increment()
  146. def sub():
  147. for i in xrange(1000):
  148. qos.decrement_eventually()
  149. def threaded(funs):
  150. from threading import Thread
  151. threads = [Thread(target=fun) for fun in funs]
  152. for thread in threads:
  153. thread.start()
  154. for thread in threads:
  155. thread.join()
  156. threaded([add, add])
  157. self.assertEqual(qos.value, 2010)
  158. qos.value = 1000
  159. threaded([add, sub]) # n = 2
  160. self.assertEqual(qos.value, 1000)
  161. class MockConsumer(object):
  162. prefetch_count = 0
  163. def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
  164. self.prefetch_count = prefetch_count
  165. def test_consumer_increment_decrement(self):
  166. consumer = self.MockConsumer()
  167. qos = QoS(consumer, 10, app_or_default().log.get_default_logger())
  168. qos.update()
  169. self.assertEqual(qos.value, 10)
  170. self.assertEqual(consumer.prefetch_count, 10)
  171. qos.decrement()
  172. self.assertEqual(qos.value, 9)
  173. self.assertEqual(consumer.prefetch_count, 9)
  174. qos.decrement_eventually()
  175. self.assertEqual(qos.value, 8)
  176. self.assertEqual(consumer.prefetch_count, 9)
  177. # Does not decrement 0 value
  178. qos.value = 0
  179. qos.decrement()
  180. self.assertEqual(qos.value, 0)
  181. qos.increment()
  182. self.assertEqual(qos.value, 0)
  183. class test_Consumer(unittest.TestCase):
  184. def setUp(self):
  185. self.ready_queue = FastQueue()
  186. self.eta_schedule = Timer()
  187. self.logger = app_or_default().log.get_default_logger()
  188. self.logger.setLevel(0)
  189. def tearDown(self):
  190. self.eta_schedule.stop()
  191. def test_info(self):
  192. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  193. send_events=False)
  194. l.qos = QoS(l.task_consumer, 10, l.logger)
  195. info = l.info
  196. self.assertEqual(info["prefetch_count"], 10)
  197. self.assertFalse(info["broker"])
  198. l.connection = app_or_default().broker_connection()
  199. info = l.info
  200. self.assertTrue(info["broker"])
  201. def test_connection(self):
  202. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  203. send_events=False)
  204. l.reset_connection()
  205. self.assertIsInstance(l.connection, BrokerConnection)
  206. l._state = RUN
  207. l.event_dispatcher = None
  208. l.stop_consumers(close=False)
  209. self.assertTrue(l.connection)
  210. l._state = RUN
  211. l.stop_consumers()
  212. self.assertIsNone(l.connection)
  213. self.assertIsNone(l.task_consumer)
  214. l.reset_connection()
  215. self.assertIsInstance(l.connection, BrokerConnection)
  216. l.stop_consumers()
  217. l.stop()
  218. l.close_connection()
  219. self.assertIsNone(l.connection)
  220. self.assertIsNone(l.task_consumer)
  221. def test_close_connection(self):
  222. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  223. send_events=False)
  224. l._state = RUN
  225. l.close_connection()
  226. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  227. send_events=False)
  228. eventer = l.event_dispatcher = MockEventDispatcher()
  229. heart = l.heart = MockHeart()
  230. l._state = RUN
  231. l.stop_consumers()
  232. self.assertTrue(eventer.closed)
  233. self.assertTrue(heart.closed)
  234. def test_receive_message_unknown(self):
  235. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  236. send_events=False)
  237. backend = MockBackend()
  238. m = create_message(backend, unknown={"baz": "!!!"})
  239. l.event_dispatcher = MockEventDispatcher()
  240. l.pidbox_node = MockNode()
  241. def with_catch_warnings(log):
  242. l.receive_message(m.decode(), m)
  243. self.assertTrue(log)
  244. self.assertIn("unknown message", log[0].message.args[0])
  245. context = catch_warnings(record=True)
  246. execute_context(context, with_catch_warnings)
  247. def test_receive_message_eta_OverflowError(self):
  248. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  249. send_events=False)
  250. backend = MockBackend()
  251. called = [False]
  252. def to_timestamp(d):
  253. called[0] = True
  254. raise OverflowError()
  255. m = create_message(backend, task=foo_task.name,
  256. args=("2, 2"),
  257. kwargs={},
  258. eta=datetime.now().isoformat())
  259. l.event_dispatcher = MockEventDispatcher()
  260. l.pidbox_node = MockNode()
  261. from celery.worker import consumer
  262. prev, consumer.to_timestamp = consumer.to_timestamp, to_timestamp
  263. try:
  264. l.receive_message(m.decode(), m)
  265. self.assertTrue(m.acknowledged)
  266. self.assertTrue(called[0])
  267. finally:
  268. consumer.to_timestamp = prev
  269. def test_receive_message_InvalidTaskError(self):
  270. logger = MockLogger()
  271. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  272. send_events=False)
  273. backend = MockBackend()
  274. m = create_message(backend, task=foo_task.name,
  275. args=(1, 2), kwargs="foobarbaz", id=1)
  276. l.event_dispatcher = MockEventDispatcher()
  277. l.pidbox_node = MockNode()
  278. l.receive_message(m.decode(), m)
  279. self.assertIn("Invalid task ignored", logger.logged[0])
  280. def test_on_decode_error(self):
  281. logger = MockLogger()
  282. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  283. send_events=False)
  284. class MockMessage(object):
  285. content_type = "application/x-msgpack"
  286. content_encoding = "binary"
  287. body = "foobarbaz"
  288. acked = False
  289. def ack(self):
  290. self.acked = True
  291. message = MockMessage()
  292. l.on_decode_error(message, KeyError("foo"))
  293. self.assertTrue(message.acked)
  294. self.assertIn("Can't decode message body", logger.logged[0])
  295. def test_receieve_message(self):
  296. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  297. send_events=False)
  298. backend = MockBackend()
  299. m = create_message(backend, task=foo_task.name,
  300. args=[2, 4, 8], kwargs={})
  301. l.event_dispatcher = MockEventDispatcher()
  302. l.receive_message(m.decode(), m)
  303. in_bucket = self.ready_queue.get_nowait()
  304. self.assertIsInstance(in_bucket, TaskRequest)
  305. self.assertEqual(in_bucket.task_name, foo_task.name)
  306. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  307. self.assertTrue(self.eta_schedule.empty())
  308. def test_start_connection_error(self):
  309. class MockConsumer(MainConsumer):
  310. iterations = 0
  311. def consume_messages(self):
  312. if not self.iterations:
  313. self.iterations = 1
  314. raise KeyError("foo")
  315. raise SyntaxError("bar")
  316. l = MockConsumer(self.ready_queue, self.eta_schedule, self.logger,
  317. send_events=False)
  318. l.connection_errors = (KeyError, )
  319. self.assertRaises(SyntaxError, l.start)
  320. l.heart.stop()
  321. def test_consume_messages(self):
  322. app = app_or_default()
  323. class Connection(app.broker_connection().__class__):
  324. obj = None
  325. def drain_events(self, **kwargs):
  326. self.obj.connection = None
  327. class Consumer(object):
  328. consuming = False
  329. prefetch_count = 0
  330. def consume(self):
  331. self.consuming = True
  332. def qos(self, prefetch_size=0, prefetch_count=0,
  333. apply_global=False):
  334. self.prefetch_count = prefetch_count
  335. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  336. send_events=False)
  337. l.connection = Connection()
  338. l.connection.obj = l
  339. l.task_consumer = Consumer()
  340. l.broadcast_consumer = Consumer()
  341. l.qos = QoS(l.task_consumer, 10, l.logger)
  342. l.consume_messages()
  343. l.consume_messages()
  344. self.assertTrue(l.task_consumer.consuming)
  345. self.assertTrue(l.broadcast_consumer.consuming)
  346. self.assertEqual(l.task_consumer.prefetch_count, 10)
  347. l.qos.decrement()
  348. l.consume_messages()
  349. self.assertEqual(l.task_consumer.prefetch_count, 9)
  350. def test_maybe_conn_error(self):
  351. def raises(error):
  352. def fun():
  353. raise error
  354. return fun
  355. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  356. send_events=False)
  357. l.connection_errors = (KeyError, )
  358. l.channel_errors = (SyntaxError, )
  359. l.maybe_conn_error(raises(AttributeError("foo")))
  360. l.maybe_conn_error(raises(KeyError("foo")))
  361. l.maybe_conn_error(raises(SyntaxError("foo")))
  362. self.assertRaises(IndexError, l.maybe_conn_error,
  363. raises(IndexError("foo")))
  364. def test_apply_eta_task(self):
  365. from celery.worker import state
  366. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  367. send_events=False)
  368. l.qos = QoS(None, 10, l.logger)
  369. task = object()
  370. qos = l.qos.value
  371. l.apply_eta_task(task)
  372. self.assertIn(task, state.reserved_requests)
  373. self.assertEqual(l.qos.value, qos - 1)
  374. self.assertIs(self.ready_queue.get_nowait(), task)
  375. def test_receieve_message_eta_isoformat(self):
  376. class MockConsumer(object):
  377. prefetch_count_incremented = False
  378. def qos(self, **kwargs):
  379. self.prefetch_count_incremented = True
  380. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  381. send_events=False)
  382. backend = MockBackend()
  383. m = create_message(backend, task=foo_task.name,
  384. eta=datetime.now().isoformat(),
  385. args=[2, 4, 8], kwargs={})
  386. l.task_consumer = MockConsumer()
  387. l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
  388. l.event_dispatcher = MockEventDispatcher()
  389. l.receive_message(m.decode(), m)
  390. l.eta_schedule.stop()
  391. items = [entry[2] for entry in self.eta_schedule.queue]
  392. found = 0
  393. for item in items:
  394. print("ITEM: %r" % (item, ) )
  395. if item.args[0].task_name == foo_task.name:
  396. found = True
  397. self.assertTrue(found)
  398. self.assertTrue(l.task_consumer.prefetch_count_incremented)
  399. l.eta_schedule.stop()
  400. def test_revoke(self):
  401. ready_queue = FastQueue()
  402. l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
  403. send_events=False)
  404. backend = MockBackend()
  405. id = gen_unique_id()
  406. t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
  407. kwargs={}, id=id)
  408. from celery.worker.state import revoked
  409. revoked.add(id)
  410. l.receive_message(t.decode(), t)
  411. self.assertTrue(ready_queue.empty())
  412. def test_receieve_message_not_registered(self):
  413. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  414. send_events=False)
  415. backend = MockBackend()
  416. m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
  417. l.event_dispatcher = MockEventDispatcher()
  418. self.assertFalse(l.receive_message(m.decode(), m))
  419. self.assertRaises(Empty, self.ready_queue.get_nowait)
  420. self.assertTrue(self.eta_schedule.empty())
  421. def test_receieve_message_eta(self):
  422. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  423. send_events=False)
  424. l.event_dispatcher = MockEventDispatcher()
  425. backend = MockBackend()
  426. m = create_message(backend, task=foo_task.name,
  427. args=[2, 4, 8], kwargs={},
  428. eta=(datetime.now() +
  429. timedelta(days=1)).isoformat())
  430. l.reset_connection()
  431. p = l.app.conf.BROKER_CONNECTION_RETRY
  432. l.app.conf.BROKER_CONNECTION_RETRY = False
  433. try:
  434. l.reset_connection()
  435. finally:
  436. l.app.conf.BROKER_CONNECTION_RETRY = p
  437. l.stop_consumers()
  438. l.event_dispatcher = MockEventDispatcher()
  439. l.receive_message(m.decode(), m)
  440. l.eta_schedule.stop()
  441. in_hold = self.eta_schedule.queue[0]
  442. self.assertEqual(len(in_hold), 3)
  443. eta, priority, entry = in_hold
  444. task = entry.args[0]
  445. self.assertIsInstance(task, TaskRequest)
  446. self.assertEqual(task.task_name, foo_task.name)
  447. self.assertEqual(task.execute(), 2 * 4 * 8)
  448. self.assertRaises(Empty, self.ready_queue.get_nowait)
  449. def test_start__consume_messages(self):
  450. class _QoS(object):
  451. prev = 3
  452. value = 4
  453. def update(self):
  454. self.prev = self.value
  455. class _Consumer(MyKombuConsumer):
  456. iterations = 0
  457. wait_method = None
  458. def reset_connection(self):
  459. if self.iterations >= 1:
  460. raise KeyError("foo")
  461. called_back = [False]
  462. def init_callback(consumer):
  463. called_back[0] = True
  464. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  465. send_events=False, init_callback=init_callback)
  466. l.task_consumer = MockConsumer()
  467. l.broadcast_consumer = MockConsumer()
  468. l.qos = _QoS()
  469. l.connection = BrokerConnection()
  470. l.iterations = 0
  471. def raises_KeyError(limit=None):
  472. l.iterations += 1
  473. if l.qos.prev != l.qos.value:
  474. l.qos.update()
  475. if l.iterations >= 2:
  476. raise KeyError("foo")
  477. l.consume_messages = raises_KeyError
  478. self.assertRaises(KeyError, l.start)
  479. self.assertTrue(called_back[0])
  480. self.assertEqual(l.iterations, 1)
  481. self.assertEqual(l.qos.prev, l.qos.value)
  482. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  483. send_events=False, init_callback=init_callback)
  484. l.qos = _QoS()
  485. l.task_consumer = MockConsumer()
  486. l.broadcast_consumer = MockConsumer()
  487. l.connection = BrokerConnection()
  488. def raises_socket_error(limit=None):
  489. l.iterations = 1
  490. raise socket.error("foo")
  491. l.consume_messages = raises_socket_error
  492. self.assertRaises(socket.error, l.start)
  493. self.assertTrue(called_back[0])
  494. self.assertEqual(l.iterations, 1)
  495. class test_WorkController(AppCase):
  496. def setup(self):
  497. self.worker = self.create_worker()
  498. def create_worker(self, **kw):
  499. worker = WorkController(concurrency=1, loglevel=0, **kw)
  500. worker.logger = MockLogger()
  501. return worker
  502. def test_process_initializer(self):
  503. from celery import Celery
  504. from celery import platforms
  505. from celery import signals
  506. from celery.app import _tls
  507. from celery.worker import process_initializer
  508. from celery.worker import WORKER_SIGRESET, WORKER_SIGIGNORE
  509. ignored_signals = []
  510. reset_signals = []
  511. worker_init = [False]
  512. default_app = app_or_default()
  513. app = Celery(loader="default", set_as_current=False)
  514. class Loader(object):
  515. def init_worker(self):
  516. worker_init[0] = True
  517. app.loader = Loader()
  518. def on_worker_process_init(**kwargs):
  519. on_worker_process_init.called = True
  520. on_worker_process_init.called = False
  521. signals.worker_process_init.connect(on_worker_process_init)
  522. def set_mp_process_title(title, hostname=None):
  523. set_mp_process_title.called = (title, hostname)
  524. set_mp_process_title.called = ()
  525. pignore_signal = platforms.ignore_signal
  526. preset_signal = platforms.reset_signal
  527. psetproctitle = platforms.set_mp_process_title
  528. platforms.ignore_signal = lambda sig: ignored_signals.append(sig)
  529. platforms.reset_signal = lambda sig: reset_signals.append(sig)
  530. platforms.set_mp_process_title = set_mp_process_title
  531. try:
  532. process_initializer(app, "awesome.worker.com")
  533. self.assertItemsEqual(ignored_signals, WORKER_SIGIGNORE)
  534. self.assertItemsEqual(reset_signals, WORKER_SIGRESET)
  535. self.assertTrue(worker_init[0])
  536. self.assertTrue(on_worker_process_init.called)
  537. self.assertIs(_tls.current_app, app)
  538. self.assertTupleEqual(set_mp_process_title.called,
  539. ("celeryd", "awesome.worker.com"))
  540. finally:
  541. platforms.ignore_signal = pignore_signal
  542. platforms.reset_signal = preset_signal
  543. platforms.set_mp_process_title = psetproctitle
  544. default_app.set_current()
  545. def test_with_rate_limits_disabled(self):
  546. worker = WorkController(concurrency=1, loglevel=0,
  547. disable_rate_limits=True)
  548. self.assertTrue(hasattr(worker.ready_queue, "put"))
  549. def test_attrs(self):
  550. worker = self.worker
  551. self.assertIsInstance(worker.scheduler, Timer)
  552. self.assertTrue(worker.scheduler)
  553. self.assertTrue(worker.pool)
  554. self.assertTrue(worker.consumer)
  555. self.assertTrue(worker.mediator)
  556. self.assertTrue(worker.components)
  557. def test_with_embedded_celerybeat(self):
  558. worker = WorkController(concurrency=1, loglevel=0,
  559. embed_clockservice=True)
  560. self.assertTrue(worker.beat)
  561. self.assertIn(worker.beat, worker.components)
  562. def test_with_autoscaler(self):
  563. worker = self.create_worker(autoscale=[10, 3], send_events=False,
  564. eta_scheduler_cls="celery.utils.timer2.Timer")
  565. self.assertTrue(worker.autoscaler)
  566. def test_dont_stop_or_terminate(self):
  567. worker = WorkController(concurrency=1, loglevel=0)
  568. worker.stop()
  569. self.assertNotEqual(worker._state, worker.CLOSE)
  570. worker.terminate()
  571. self.assertNotEqual(worker._state, worker.CLOSE)
  572. sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
  573. try:
  574. worker._state = worker.RUN
  575. worker.stop(in_sighandler=True)
  576. self.assertNotEqual(worker._state, worker.CLOSE)
  577. worker.terminate(in_sighandler=True)
  578. self.assertNotEqual(worker._state, worker.CLOSE)
  579. finally:
  580. worker.pool.signal_safe = sigsafe
  581. def test_on_timer_error(self):
  582. worker = WorkController(concurrency=1, loglevel=0)
  583. worker.logger = MockLogger()
  584. try:
  585. raise KeyError("foo")
  586. except KeyError:
  587. exc_info = sys.exc_info()
  588. worker.on_timer_error(exc_info)
  589. logged = worker.logger.logged[0]
  590. self.assertIn("KeyError", logged)
  591. def test_on_timer_tick(self):
  592. worker = WorkController(concurrency=1, loglevel=10)
  593. worker.logger = MockLogger()
  594. worker.timer_debug = worker.logger.debug
  595. worker.on_timer_tick(30.0)
  596. logged = worker.logger.logged[0]
  597. self.assertIn("30.0", logged)
  598. def test_process_task(self):
  599. worker = self.worker
  600. worker.pool = MockPool()
  601. backend = MockBackend()
  602. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  603. kwargs={})
  604. task = TaskRequest.from_message(m, m.decode())
  605. worker.process_task(task)
  606. worker.pool.stop()
  607. def test_process_task_raise_base(self):
  608. worker = self.worker
  609. worker.pool = MockPool(raise_base=True)
  610. backend = MockBackend()
  611. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  612. kwargs={})
  613. task = TaskRequest.from_message(m, m.decode())
  614. worker.components = []
  615. worker._state = worker.RUN
  616. self.assertRaises(KeyboardInterrupt, worker.process_task, task)
  617. self.assertEqual(worker._state, worker.TERMINATE)
  618. def test_process_task_raise_SystemTerminate(self):
  619. worker = self.worker
  620. worker.pool = MockPool(raise_SystemTerminate=True)
  621. backend = MockBackend()
  622. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  623. kwargs={})
  624. task = TaskRequest.from_message(m, m.decode())
  625. worker.components = []
  626. worker._state = worker.RUN
  627. self.assertRaises(SystemExit, worker.process_task, task)
  628. self.assertEqual(worker._state, worker.TERMINATE)
  629. def test_process_task_raise_regular(self):
  630. worker = self.worker
  631. worker.pool = MockPool(raise_regular=True)
  632. backend = MockBackend()
  633. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  634. kwargs={})
  635. task = TaskRequest.from_message(m, m.decode())
  636. worker.process_task(task)
  637. worker.pool.stop()
  638. def test_start_catches_base_exceptions(self):
  639. class Component(object):
  640. stopped = False
  641. terminated = False
  642. def __init__(self, exc):
  643. self.exc = exc
  644. def start(self):
  645. raise self.exc
  646. def terminate(self):
  647. self.terminated = True
  648. def stop(self):
  649. self.stopped = True
  650. worker1 = self.create_worker()
  651. worker1.components = [Component(SystemTerminate())]
  652. self.assertRaises(SystemExit, worker1.start)
  653. self.assertTrue(worker1.components[0].terminated)
  654. worker2 = self.create_worker()
  655. worker2.components = [Component(SystemExit())]
  656. self.assertRaises(SystemExit, worker2.start)
  657. self.assertTrue(worker2.components[0].stopped)
  658. def test_state_db(self):
  659. from celery.worker import state
  660. Persistent = state.Persistent
  661. class MockPersistent(Persistent):
  662. def _load(self):
  663. return {}
  664. state.Persistent = MockPersistent
  665. try:
  666. worker = self.create_worker(db="statefilename")
  667. self.assertTrue(worker._finalize_db)
  668. worker._finalize_db.cancel()
  669. finally:
  670. state.Persistent = Persistent
  671. @skip("Issue #264")
  672. def test_disable_rate_limits(self):
  673. from celery.worker.buckets import FastQueue
  674. worker = self.create_worker(disable_rate_limits=True)
  675. self.assertIsInstance(worker.ready_queue, FastQueue)
  676. self.assertIsNone(worker.mediator)
  677. self.assertEqual(worker.ready_queue.put, worker.process_task)
  678. def test_start__stop(self):
  679. worker = self.worker
  680. w1 = {"started": False}
  681. w2 = {"started": False}
  682. w3 = {"started": False}
  683. w4 = {"started": False}
  684. worker.components = [MockController(w1), MockController(w2),
  685. MockController(w3), MockController(w4)]
  686. worker.start()
  687. for w in (w1, w2, w3, w4):
  688. self.assertTrue(w["started"])
  689. self.assertTrue(worker._running, len(worker.components))
  690. worker.stop()
  691. for component in worker.components:
  692. self.assertTrue(component._stopped)
  693. def test_start__terminate(self):
  694. worker = self.worker
  695. w1 = {"started": False}
  696. w2 = {"started": False}
  697. w3 = {"started": False}
  698. w4 = {"started": False}
  699. worker.components = [MockController(w1), MockController(w2),
  700. MockController(w3), MockController(w4),
  701. MockPool()]
  702. worker.start()
  703. for w in (w1, w2, w3, w4):
  704. self.assertTrue(w["started"])
  705. self.assertTrue(worker._running, len(worker.components))
  706. self.assertEqual(worker._state, RUN)
  707. worker.terminate()
  708. for component in worker.components:
  709. self.assertTrue(component._stopped)
  710. self.assertTrue(worker.components[4]._terminated)