test_worker.py 18 KB


  1. import socket
  2. import unittest2 as unittest
  3. from datetime import datetime, timedelta
  4. from multiprocessing import get_logger
  5. from Queue import Empty
  6. from carrot.backends.base import BaseMessage
  7. from carrot.connection import BrokerConnection
  8. from celery.utils.timer2 import Timer
  9. from celery.decorators import task as task_dec
  10. from celery.decorators import periodic_task as periodic_task_dec
  11. from celery.serialization import pickle
  12. from celery.utils import gen_unique_id
  13. from celery.worker import WorkController
  14. from celery.worker.buckets import FastQueue
  15. from celery.worker.job import TaskRequest
  16. from celery.worker.listener import CarrotListener, QoS, RUN
  17. from celery.tests.compat import catch_warnings
  18. from celery.tests.utils import execute_context
  19. class PlaceHolder(object):
  20. pass
  21. class MyCarrotListener(CarrotListener):
  22. def restart_heartbeat(self):
  23. self.heart = None
  24. class MockControlDispatch(object):
  25. commands = []
  26. def dispatch_from_message(self, message):
  27. self.commands.append(message.pop("command", None))
  28. class MockEventDispatcher(object):
  29. sent = []
  30. closed = False
  31. flushed = False
  32. def send(self, event, *args, **kwargs):
  33. self.sent.append(event)
  34. def close(self):
  35. self.closed = True
  36. def flush(self):
  37. self.flushed = True
  38. class MockHeart(object):
  39. closed = False
  40. def stop(self):
  41. self.closed = True
  42. @task_dec()
  43. def foo_task(x, y, z, **kwargs):
  44. return x * y * z
  45. @periodic_task_dec(run_every=60)
  46. def foo_periodic_task():
  47. return "foo"
  48. class MockLogger(object):
  49. def __init__(self):
  50. self.logged = []
  51. def critical(self, msg, *args, **kwargs):
  52. self.logged.append(msg)
  53. def info(self, msg, *args, **kwargs):
  54. self.logged.append(msg)
  55. def error(self, msg, *args, **kwargs):
  56. self.logged.append(msg)
  57. def debug(self, msg, *args, **kwargs):
  58. self.logged.append(msg)
  59. class MockBackend(object):
  60. _acked = False
  61. def ack(self, delivery_tag):
  62. self._acked = True
  63. class MockPool(object):
  64. _terminated = False
  65. _stopped = False
  66. def __init__(self, *args, **kwargs):
  67. self.raise_regular = kwargs.get("raise_regular", False)
  68. self.raise_base = kwargs.get("raise_base", False)
  69. def apply_async(self, *args, **kwargs):
  70. if self.raise_regular:
  71. raise KeyError("some exception")
  72. if self.raise_base:
  73. raise KeyboardInterrupt("Ctrl+c")
  74. def start(self):
  75. pass
  76. def stop(self):
  77. self._stopped = True
  78. return True
  79. def terminate(self):
  80. self._terminated = True
  81. self.stop()
  82. class MockController(object):
  83. def __init__(self, w, *args, **kwargs):
  84. self._w = w
  85. self._stopped = False
  86. def start(self):
  87. self._w["started"] = True
  88. self._stopped = False
  89. def stop(self):
  90. self._stopped = True
  91. def create_message(backend, **data):
  92. data.setdefault("id", gen_unique_id())
  93. return BaseMessage(backend, body=pickle.dumps(dict(**data)),
  94. content_type="application/x-python-serialize",
  95. content_encoding="binary")
  96. class test_QoS(unittest.TestCase):
  97. class MockConsumer(object):
  98. prefetch_count = 0
  99. def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
  100. self.prefetch_count = prefetch_count
  101. def test_decrement(self):
  102. consumer = self.MockConsumer()
  103. qos = QoS(consumer, 10, get_logger())
  104. qos.update()
  105. self.assertEqual(int(qos.value), 10)
  106. self.assertEqual(consumer.prefetch_count, 10)
  107. qos.decrement()
  108. self.assertEqual(int(qos.value), 9)
  109. self.assertEqual(consumer.prefetch_count, 9)
  110. qos.decrement_eventually()
  111. self.assertEqual(int(qos.value), 8)
  112. self.assertEqual(consumer.prefetch_count, 9)
  113. class test_CarrotListener(unittest.TestCase):
  114. def setUp(self):
  115. self.ready_queue = FastQueue()
  116. self.eta_schedule = Timer()
  117. self.logger = get_logger()
  118. self.logger.setLevel(0)
  119. def tearDown(self):
  120. self.eta_schedule.stop()
  121. def test_mainloop(self):
  122. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  123. send_events=False)
  124. class MockConnection(object):
  125. def drain_events(self):
  126. return "draining"
  127. l.connection = MockConnection()
  128. l.connection.connection = MockConnection()
  129. it = l._mainloop()
  130. self.assertTrue(it.next(), "draining")
  131. records = {}
  132. def create_recorder(key):
  133. def _recorder(*args, **kwargs):
  134. records[key] = True
  135. return _recorder
  136. l.task_consumer = PlaceHolder()
  137. l.task_consumer.iterconsume = create_recorder("consume_tasks")
  138. l.broadcast_consumer = PlaceHolder()
  139. l.broadcast_consumer.register_callback = create_recorder(
  140. "broadcast_callback")
  141. l.broadcast_consumer.iterconsume = create_recorder(
  142. "consume_broadcast")
  143. l.task_consumer.add_consumer = create_recorder("consumer_add")
  144. records.clear()
  145. self.assertEqual(l._detect_wait_method(), l._mainloop)
  146. for record in ("broadcast_callback", "consume_broadcast",
  147. "consume_tasks"):
  148. self.assertTrue(records.get(record))
  149. records.clear()
  150. l.connection.connection = PlaceHolder()
  151. self.assertIs(l._detect_wait_method(), l.task_consumer.iterconsume)
  152. self.assertTrue(records.get("consumer_add"))
  153. def test_connection(self):
  154. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  155. send_events=False)
  156. l.reset_connection()
  157. self.assertIsInstance(l.connection, BrokerConnection)
  158. l.stop_consumers()
  159. self.assertIsNone(l.connection)
  160. self.assertIsNone(l.task_consumer)
  161. l.reset_connection()
  162. self.assertIsInstance(l.connection, BrokerConnection)
  163. l.stop_consumers()
  164. l.stop()
  165. l.close_connection()
  166. self.assertIsNone(l.connection)
  167. self.assertIsNone(l.task_consumer)
  168. def test_receive_message_control_command(self):
  169. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  170. send_events=False)
  171. backend = MockBackend()
  172. m = create_message(backend, control={"command": "shutdown"})
  173. l.event_dispatcher = MockEventDispatcher()
  174. l.control_dispatch = MockControlDispatch()
  175. l.receive_message(m.decode(), m)
  176. self.assertIn("shutdown", l.control_dispatch.commands)
  177. def test_close_connection(self):
  178. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  179. send_events=False)
  180. l._state = RUN
  181. l.close_connection()
  182. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  183. send_events=False)
  184. eventer = l.event_dispatcher = MockEventDispatcher()
  185. heart = l.heart = MockHeart()
  186. l._state = RUN
  187. l.stop_consumers()
  188. self.assertTrue(eventer.closed)
  189. self.assertTrue(heart.closed)
  190. def test_receive_message_unknown(self):
  191. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  192. send_events=False)
  193. backend = MockBackend()
  194. m = create_message(backend, unknown={"baz": "!!!"})
  195. l.event_dispatcher = MockEventDispatcher()
  196. l.control_dispatch = MockControlDispatch()
  197. def with_catch_warnings(log):
  198. l.receive_message(m.decode(), m)
  199. self.assertTrue(log)
  200. self.assertIn("unknown message", log[0].message.args[0])
  201. context = catch_warnings(record=True)
  202. execute_context(context, with_catch_warnings)
  203. def test_receive_message_InvalidTaskError(self):
  204. logger = MockLogger()
  205. l = MyCarrotListener(self.ready_queue, self.eta_schedule, logger,
  206. send_events=False)
  207. backend = MockBackend()
  208. m = create_message(backend, task=foo_task.name,
  209. args=(1, 2), kwargs="foobarbaz", id=1)
  210. l.event_dispatcher = MockEventDispatcher()
  211. l.control_dispatch = MockControlDispatch()
  212. l.receive_message(m.decode(), m)
  213. self.assertIn("Invalid task ignored", logger.logged[0])
  214. def test_on_decode_error(self):
  215. logger = MockLogger()
  216. l = MyCarrotListener(self.ready_queue, self.eta_schedule, logger,
  217. send_events=False)
  218. class MockMessage(object):
  219. content_type = "application/x-msgpack"
  220. content_encoding = "binary"
  221. body = "foobarbaz"
  222. acked = False
  223. def ack(self):
  224. self.acked = True
  225. message = MockMessage()
  226. l.on_decode_error(message, KeyError("foo"))
  227. self.assertTrue(message.acked)
  228. self.assertIn("Message decoding error", logger.logged[0])
  229. def test_receieve_message(self):
  230. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  231. send_events=False)
  232. backend = MockBackend()
  233. m = create_message(backend, task=foo_task.name,
  234. args=[2, 4, 8], kwargs={})
  235. l.event_dispatcher = MockEventDispatcher()
  236. l.receive_message(m.decode(), m)
  237. in_bucket = self.ready_queue.get_nowait()
  238. self.assertIsInstance(in_bucket, TaskRequest)
  239. self.assertEqual(in_bucket.task_name, foo_task.name)
  240. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  241. self.assertTrue(self.eta_schedule.empty())
  242. def test_receieve_message_eta_isoformat(self):
  243. class MockConsumer(object):
  244. prefetch_count_incremented = False
  245. def qos(self, **kwargs):
  246. self.prefetch_count_incremented = True
  247. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  248. send_events=False)
  249. backend = MockBackend()
  250. m = create_message(backend, task=foo_task.name,
  251. eta=datetime.now().isoformat(),
  252. args=[2, 4, 8], kwargs={})
  253. l.task_consumer = MockConsumer()
  254. l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
  255. l.event_dispatcher = MockEventDispatcher()
  256. l.receive_message(m.decode(), m)
  257. items = [entry[2] for entry in self.eta_schedule.queue]
  258. found = 0
  259. for item in items:
  260. if item.args[0].task_name == foo_task.name:
  261. found = True
  262. self.assertTrue(found)
  263. self.assertTrue(l.task_consumer.prefetch_count_incremented)
  264. l.eta_schedule.stop()
  265. def test_revoke(self):
  266. ready_queue = FastQueue()
  267. l = MyCarrotListener(ready_queue, self.eta_schedule, self.logger,
  268. send_events=False)
  269. backend = MockBackend()
  270. id = gen_unique_id()
  271. c = create_message(backend, control={"command": "revoke",
  272. "task_id": id})
  273. t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
  274. kwargs={}, id=id)
  275. l.event_dispatcher = MockEventDispatcher()
  276. l.receive_message(c.decode(), c)
  277. from celery.worker.state import revoked
  278. self.assertIn(id, revoked)
  279. l.receive_message(t.decode(), t)
  280. self.assertTrue(ready_queue.empty())
  281. def test_receieve_message_not_registered(self):
  282. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  283. send_events=False)
  284. backend = MockBackend()
  285. m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
  286. l.event_dispatcher = MockEventDispatcher()
  287. self.assertFalse(l.receive_message(m.decode(), m))
  288. self.assertRaises(Empty, self.ready_queue.get_nowait)
  289. self.assertTrue(self.eta_schedule.empty())
  290. def test_receieve_message_eta(self):
  291. l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  292. send_events=False)
  293. dispatcher = l.event_dispatcher = MockEventDispatcher()
  294. backend = MockBackend()
  295. m = create_message(backend, task=foo_task.name,
  296. args=[2, 4, 8], kwargs={},
  297. eta=(datetime.now() +
  298. timedelta(days=1)).isoformat())
  299. l.reset_connection()
  300. p = l.app.conf.BROKER_CONNECTION_RETRY
  301. l.app.conf.BROKER_CONNECTION_RETRY = False
  302. try:
  303. l.reset_connection()
  304. finally:
  305. l.app.conf.BROKER_CONNECTION_RETRY = p
  306. l.stop_consumers()
  307. self.assertTrue(dispatcher.flushed)
  308. l.event_dispatcher = MockEventDispatcher()
  309. l.receive_message(m.decode(), m)
  310. in_hold = self.eta_schedule.queue[0]
  311. self.assertEqual(len(in_hold), 3)
  312. eta, priority, entry = in_hold
  313. task = entry.args[0]
  314. self.assertIsInstance(task, TaskRequest)
  315. self.assertEqual(task.task_name, foo_task.name)
  316. self.assertEqual(task.execute(), 2 * 4 * 8)
  317. self.assertRaises(Empty, self.ready_queue.get_nowait)
  318. def test_start__consume_messages(self):
  319. class _QoS(object):
  320. prev = 3
  321. next = 4
  322. def update(self):
  323. self.prev = self.next
  324. class _Listener(MyCarrotListener):
  325. iterations = 0
  326. wait_method = None
  327. def reset_connection(self):
  328. if self.iterations >= 1:
  329. raise KeyError("foo")
  330. def _detect_wait_method(self):
  331. return self.wait_method
  332. called_back = [False]
  333. def init_callback(listener):
  334. called_back[0] = True
  335. l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
  336. send_events=False, init_callback=init_callback)
  337. l.qos = _QoS()
  338. def raises_KeyError(limit=None):
  339. yield True
  340. l.iterations = 1
  341. raise KeyError("foo")
  342. l.wait_method = raises_KeyError
  343. self.assertRaises(KeyError, l.start)
  344. self.assertTrue(called_back[0])
  345. self.assertEqual(l.iterations, 1)
  346. self.assertEqual(l.qos.prev, l.qos.next)
  347. l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
  348. send_events=False, init_callback=init_callback)
  349. l.qos = _QoS()
  350. def raises_socket_error(limit=None):
  351. yield True
  352. l.iterations = 1
  353. raise socket.error("foo")
  354. l.wait_method = raises_socket_error
  355. self.assertRaises(KeyError, l.start)
  356. self.assertTrue(called_back[0])
  357. self.assertEqual(l.iterations, 1)
  358. class test_WorkController(unittest.TestCase):
  359. def setUp(self):
  360. self.worker = WorkController(concurrency=1, loglevel=0)
  361. self.worker.logger = MockLogger()
  362. def test_with_rate_limits_disabled(self):
  363. worker = WorkController(concurrency=1, loglevel=0,
  364. disable_rate_limits=True)
  365. self.assertIsInstance(worker.ready_queue, FastQueue)
  366. def test_attrs(self):
  367. worker = self.worker
  368. self.assertIsInstance(worker.scheduler, Timer)
  369. self.assertTrue(worker.scheduler)
  370. self.assertTrue(worker.pool)
  371. self.assertTrue(worker.listener)
  372. self.assertTrue(worker.mediator)
  373. self.assertTrue(worker.components)
  374. def test_with_embedded_celerybeat(self):
  375. worker = WorkController(concurrency=1, loglevel=0,
  376. embed_clockservice=True)
  377. self.assertTrue(worker.beat)
  378. self.assertIn(worker.beat, worker.components)
  379. def test_process_task(self):
  380. worker = self.worker
  381. worker.pool = MockPool()
  382. backend = MockBackend()
  383. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  384. kwargs={})
  385. task = TaskRequest.from_message(m, m.decode())
  386. worker.process_task(task)
  387. worker.pool.stop()
  388. def test_process_task_raise_base(self):
  389. worker = self.worker
  390. worker.pool = MockPool(raise_base=True)
  391. backend = MockBackend()
  392. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  393. kwargs={})
  394. task = TaskRequest.from_message(m, m.decode())
  395. worker.process_task(task)
  396. worker.pool.stop()
  397. def test_process_task_raise_regular(self):
  398. worker = self.worker
  399. worker.pool = MockPool(raise_regular=True)
  400. backend = MockBackend()
  401. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  402. kwargs={})
  403. task = TaskRequest.from_message(m, m.decode())
  404. worker.process_task(task)
  405. worker.pool.stop()
  406. def test_start__stop(self):
  407. worker = self.worker
  408. w1 = {"started": False}
  409. w2 = {"started": False}
  410. w3 = {"started": False}
  411. w4 = {"started": False}
  412. worker.components = [MockController(w1), MockController(w2),
  413. MockController(w3), MockController(w4)]
  414. worker.start()
  415. for w in (w1, w2, w3, w4):
  416. self.assertTrue(w["started"])
  417. self.assertTrue(worker._running, len(worker.components))
  418. worker.stop()
  419. for component in worker.components:
  420. self.assertTrue(component._stopped)
  421. def test_start__terminate(self):
  422. worker = self.worker
  423. w1 = {"started": False}
  424. w2 = {"started": False}
  425. w3 = {"started": False}
  426. w4 = {"started": False}
  427. worker.components = [MockController(w1), MockController(w2),
  428. MockController(w3), MockController(w4),
  429. MockPool()]
  430. worker.start()
  431. for w in (w1, w2, w3, w4):
  432. self.assertTrue(w["started"])
  433. self.assertTrue(worker._running, len(worker.components))
  434. self.assertEqual(worker._state, RUN)
  435. worker.terminate()
  436. for component in worker.components:
  437. self.assertTrue(component._stopped)
  438. self.assertTrue(worker.components[4]._terminated)