test_worker.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550
  1. import socket
  2. import unittest2 as unittest
  3. from datetime import datetime, timedelta
  4. from multiprocessing import get_logger
  5. from Queue import Empty
  6. from carrot.backends.base import BaseMessage
  7. from carrot.connection import BrokerConnection
  8. from celery import conf
  9. from celery.decorators import task as task_dec
  10. from celery.decorators import periodic_task as periodic_task_dec
  11. from celery.serialization import pickle
  12. from celery.utils import gen_unique_id
  13. from celery.worker import WorkController
  14. from celery.worker.buckets import FastQueue
  15. from celery.worker.job import TaskRequest
  16. from celery.worker.listener import CarrotListener, QoS, RUN
  17. from celery.worker.scheduler import Scheduler
  18. from celery.tests.compat import catch_warnings
  19. from celery.tests.utils import execute_context
  20. class PlaceHolder(object):
  21. pass
  22. class MockControlDispatch(object):
  23. commands = []
  24. def dispatch_from_message(self, message):
  25. self.commands.append(message.pop("command", None))
  26. class MockEventDispatcher(object):
  27. sent = []
  28. closed = False
  29. def send(self, event, *args, **kwargs):
  30. self.sent.append(event)
  31. def close(self):
  32. self.closed = True
  33. class MockHeart(object):
  34. closed = False
  35. def stop(self):
  36. self.closed = True
  37. @task_dec()
  38. def foo_task(x, y, z, **kwargs):
  39. return x * y * z
  40. @periodic_task_dec(run_every=60)
  41. def foo_periodic_task():
  42. return "foo"
  43. class MockLogger(object):
  44. def __init__(self):
  45. self.logged = []
  46. def critical(self, msg, *args, **kwargs):
  47. self.logged.append(msg)
  48. def info(self, msg, *args, **kwargs):
  49. self.logged.append(msg)
  50. def error(self, msg, *args, **kwargs):
  51. self.logged.append(msg)
  52. def debug(self, msg, *args, **kwargs):
  53. self.logged.append(msg)
  54. class MockBackend(object):
  55. _acked = False
  56. def ack(self, delivery_tag):
  57. self._acked = True
  58. class MockPool(object):
  59. _terminated = False
  60. _stopped = False
  61. def __init__(self, *args, **kwargs):
  62. self.raise_regular = kwargs.get("raise_regular", False)
  63. self.raise_base = kwargs.get("raise_base", False)
  64. def apply_async(self, *args, **kwargs):
  65. if self.raise_regular:
  66. raise KeyError("some exception")
  67. if self.raise_base:
  68. raise KeyboardInterrupt("Ctrl+c")
  69. def start(self):
  70. pass
  71. def stop(self):
  72. self._stopped = True
  73. return True
  74. def terminate(self):
  75. self._terminated = True
  76. self.stop()
  77. class MockController(object):
  78. def __init__(self, w, *args, **kwargs):
  79. self._w = w
  80. self._stopped = False
  81. def start(self):
  82. self._w["started"] = True
  83. self._stopped = False
  84. def stop(self):
  85. self._stopped = True
  86. def create_message(backend, **data):
  87. data.setdefault("id", gen_unique_id())
  88. return BaseMessage(backend, body=pickle.dumps(dict(**data)),
  89. content_type="application/x-python-serialize",
  90. content_encoding="binary")
  91. class test_QoS(unittest.TestCase):
  92. class MockConsumer(object):
  93. prefetch_count = 0
  94. def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
  95. self.prefetch_count = prefetch_count
  96. def test_decrement(self):
  97. consumer = self.MockConsumer()
  98. qos = QoS(consumer, 10, get_logger())
  99. qos.update()
  100. self.assertEqual(int(qos.value), 10)
  101. self.assertEqual(consumer.prefetch_count, 10)
  102. qos.decrement()
  103. self.assertEqual(int(qos.value), 9)
  104. self.assertEqual(consumer.prefetch_count, 9)
  105. qos.decrement_eventually()
  106. self.assertEqual(int(qos.value), 8)
  107. self.assertEqual(consumer.prefetch_count, 9)
  108. class test_CarrotListener(unittest.TestCase):
  109. def setUp(self):
  110. self.ready_queue = FastQueue()
  111. self.eta_schedule = Scheduler(self.ready_queue)
  112. self.logger = get_logger()
  113. self.logger.setLevel(0)
  114. def test_mainloop(self):
  115. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  116. send_events=False)
  117. class MockConnection(object):
  118. def drain_events(self):
  119. return "draining"
  120. l.connection = MockConnection()
  121. l.connection.connection = MockConnection()
  122. it = l._mainloop()
  123. self.assertTrue(it.next(), "draining")
  124. records = {}
  125. def create_recorder(key):
  126. def _recorder(*args, **kwargs):
  127. records[key] = True
  128. return _recorder
  129. l.task_consumer = PlaceHolder()
  130. l.task_consumer.iterconsume = create_recorder("consume_tasks")
  131. l.broadcast_consumer = PlaceHolder()
  132. l.broadcast_consumer.register_callback = create_recorder(
  133. "broadcast_callback")
  134. l.broadcast_consumer.iterconsume = create_recorder(
  135. "consume_broadcast")
  136. l.task_consumer.add_consumer = create_recorder("consumer_add")
  137. records.clear()
  138. self.assertEqual(l._detect_wait_method(), l._mainloop)
  139. for record in ("broadcast_callback", "consume_broadcast",
  140. "consume_tasks"):
  141. self.assertTrue(records.get(record))
  142. records.clear()
  143. l.connection.connection = PlaceHolder()
  144. self.assertIs(l._detect_wait_method(), l.task_consumer.iterconsume)
  145. self.assertTrue(records.get("consumer_add"))
  146. def test_connection(self):
  147. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  148. send_events=False)
  149. l.reset_connection()
  150. self.assertIsInstance(l.connection, BrokerConnection)
  151. l.stop_consumers()
  152. self.assertIsNone(l.connection)
  153. self.assertIsNone(l.task_consumer)
  154. l.reset_connection()
  155. self.assertIsInstance(l.connection, BrokerConnection)
  156. l.stop()
  157. l.close_connection()
  158. self.assertIsNone(l.connection)
  159. self.assertIsNone(l.task_consumer)
  160. def test_receive_message_control_command(self):
  161. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  162. send_events=False)
  163. backend = MockBackend()
  164. m = create_message(backend, control={"command": "shutdown"})
  165. l.event_dispatcher = MockEventDispatcher()
  166. l.control_dispatch = MockControlDispatch()
  167. l.receive_message(m.decode(), m)
  168. self.assertIn("shutdown", l.control_dispatch.commands)
  169. def test_close_connection(self):
  170. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  171. send_events=False)
  172. l._state = RUN
  173. l.close_connection()
  174. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  175. send_events=False)
  176. eventer = l.event_dispatcher = MockEventDispatcher()
  177. heart = l.heart = MockHeart()
  178. l._state = RUN
  179. l.stop_consumers()
  180. self.assertTrue(eventer.closed)
  181. self.assertTrue(heart.closed)
  182. def test_receive_message_unknown(self):
  183. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  184. send_events=False)
  185. backend = MockBackend()
  186. m = create_message(backend, unknown={"baz": "!!!"})
  187. l.event_dispatcher = MockEventDispatcher()
  188. l.control_dispatch = MockControlDispatch()
  189. def with_catch_warnings(log):
  190. l.receive_message(m.decode(), m)
  191. self.assertTrue(log)
  192. self.assertIn("unknown message", log[0].message.args[0])
  193. context = catch_warnings(record=True)
  194. execute_context(context, with_catch_warnings)
  195. def test_receive_message_InvalidTaskError(self):
  196. logger = MockLogger()
  197. l = CarrotListener(self.ready_queue, self.eta_schedule, logger,
  198. send_events=False)
  199. backend = MockBackend()
  200. m = create_message(backend, task=foo_task.name,
  201. args=(1, 2), kwargs="foobarbaz", id=1)
  202. l.event_dispatcher = MockEventDispatcher()
  203. l.control_dispatch = MockControlDispatch()
  204. l.receive_message(m.decode(), m)
  205. self.assertIn("Invalid task ignored", logger.logged[0])
  206. def test_on_decode_error(self):
  207. logger = MockLogger()
  208. l = CarrotListener(self.ready_queue, self.eta_schedule, logger,
  209. send_events=False)
  210. class MockMessage(object):
  211. content_type = "application/x-msgpack"
  212. content_encoding = "binary"
  213. body = "foobarbaz"
  214. acked = False
  215. def ack(self):
  216. self.acked = True
  217. message = MockMessage()
  218. l.on_decode_error(message, KeyError("foo"))
  219. self.assertTrue(message.acked)
  220. self.assertIn("Message decoding error", logger.logged[0])
  221. def test_receieve_message(self):
  222. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  223. send_events=False)
  224. backend = MockBackend()
  225. m = create_message(backend, task=foo_task.name,
  226. args=[2, 4, 8], kwargs={})
  227. l.event_dispatcher = MockEventDispatcher()
  228. l.receive_message(m.decode(), m)
  229. in_bucket = self.ready_queue.get_nowait()
  230. self.assertIsInstance(in_bucket, TaskRequest)
  231. self.assertEqual(in_bucket.task_name, foo_task.name)
  232. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  233. self.assertTrue(self.eta_schedule.empty())
  234. def test_receieve_message_eta_isoformat(self):
  235. class MockConsumer(object):
  236. prefetch_count_incremented = False
  237. def qos(self, **kwargs):
  238. self.prefetch_count_incremented = True
  239. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  240. send_events=False)
  241. backend = MockBackend()
  242. m = create_message(backend, task=foo_task.name,
  243. eta=datetime.now().isoformat(),
  244. args=[2, 4, 8], kwargs={})
  245. l.event_dispatcher = MockEventDispatcher()
  246. l.task_consumer = MockConsumer()
  247. l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
  248. l.receive_message(m.decode(), m)
  249. items = [entry[2] for entry in self.eta_schedule.queue]
  250. found = 0
  251. for item in items:
  252. if item.task_name == foo_task.name:
  253. found = True
  254. self.assertTrue(found)
  255. self.assertTrue(l.task_consumer.prefetch_count_incremented)
  256. def test_revoke(self):
  257. ready_queue = FastQueue()
  258. l = CarrotListener(ready_queue, self.eta_schedule, self.logger,
  259. send_events=False)
  260. backend = MockBackend()
  261. id = gen_unique_id()
  262. c = create_message(backend, control={"command": "revoke",
  263. "task_id": id})
  264. t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
  265. kwargs={}, id=id)
  266. l.event_dispatcher = MockEventDispatcher()
  267. l.receive_message(c.decode(), c)
  268. from celery.worker.revoke import revoked
  269. self.assertIn(id, revoked)
  270. l.receive_message(t.decode(), t)
  271. self.assertTrue(ready_queue.empty())
  272. def test_receieve_message_not_registered(self):
  273. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  274. send_events=False)
  275. backend = MockBackend()
  276. m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
  277. l.event_dispatcher = MockEventDispatcher()
  278. self.assertFalse(l.receive_message(m.decode(), m))
  279. self.assertRaises(Empty, self.ready_queue.get_nowait)
  280. self.assertTrue(self.eta_schedule.empty())
  281. def test_receieve_message_eta(self):
  282. l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
  283. send_events=False)
  284. backend = MockBackend()
  285. m = create_message(backend, task=foo_task.name,
  286. args=[2, 4, 8], kwargs={},
  287. eta=(datetime.now() +
  288. timedelta(days=1)).isoformat())
  289. l.reset_connection()
  290. p, conf.BROKER_CONNECTION_RETRY = conf.BROKER_CONNECTION_RETRY, False
  291. try:
  292. l.reset_connection()
  293. finally:
  294. conf.BROKER_CONNECTION_RETRY = p
  295. l.receive_message(m.decode(), m)
  296. in_hold = self.eta_schedule.queue[0]
  297. self.assertEqual(len(in_hold), 4)
  298. eta, priority, task, on_accept = in_hold
  299. self.assertIsInstance(task, TaskRequest)
  300. self.assertTrue(callable(on_accept))
  301. self.assertEqual(task.task_name, foo_task.name)
  302. self.assertEqual(task.execute(), 2 * 4 * 8)
  303. self.assertRaises(Empty, self.ready_queue.get_nowait)
  304. def test_start__consume_messages(self):
  305. class _QoS(object):
  306. prev = 3
  307. next = 4
  308. def update(self):
  309. self.prev = self.next
  310. class _Listener(CarrotListener):
  311. iterations = 0
  312. wait_method = None
  313. def reset_connection(self):
  314. if self.iterations >= 1:
  315. raise KeyError("foo")
  316. def _detect_wait_method(self):
  317. return self.wait_method
  318. called_back = [False]
  319. def init_callback(listener):
  320. called_back[0] = True
  321. l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
  322. send_events=False, init_callback=init_callback)
  323. l.qos = _QoS()
  324. def raises_KeyError(limit=None):
  325. yield True
  326. l.iterations = 1
  327. raise KeyError("foo")
  328. l.wait_method = raises_KeyError
  329. self.assertRaises(KeyError, l.start)
  330. self.assertTrue(called_back[0])
  331. self.assertEqual(l.iterations, 1)
  332. self.assertEqual(l.qos.prev, l.qos.next)
  333. l = _Listener(self.ready_queue, self.eta_schedule, self.logger,
  334. send_events=False, init_callback=init_callback)
  335. l.qos = _QoS()
  336. def raises_socket_error(limit=None):
  337. yield True
  338. l.iterations = 1
  339. raise socket.error("foo")
  340. l.wait_method = raises_socket_error
  341. self.assertRaises(KeyError, l.start)
  342. self.assertTrue(called_back[0])
  343. self.assertEqual(l.iterations, 1)
  344. class test_WorkController(unittest.TestCase):
  345. def setUp(self):
  346. self.worker = WorkController(concurrency=1, loglevel=0)
  347. self.worker.logger = MockLogger()
  348. def test_with_rate_limits_disabled(self):
  349. conf.DISABLE_RATE_LIMITS = True
  350. try:
  351. worker = WorkController(concurrency=1, loglevel=0)
  352. self.assertIsInstance(worker.ready_queue, FastQueue)
  353. finally:
  354. conf.DISABLE_RATE_LIMITS = False
  355. def test_attrs(self):
  356. worker = self.worker
  357. self.assertIsInstance(worker.eta_schedule, Scheduler)
  358. self.assertTrue(worker.scheduler)
  359. self.assertTrue(worker.pool)
  360. self.assertTrue(worker.listener)
  361. self.assertTrue(worker.mediator)
  362. self.assertTrue(worker.components)
  363. def test_with_embedded_clockservice(self):
  364. worker = WorkController(concurrency=1, loglevel=0,
  365. embed_clockservice=True)
  366. self.assertTrue(worker.clockservice)
  367. self.assertIn(worker.clockservice, worker.components)
  368. def test_process_task(self):
  369. worker = self.worker
  370. worker.pool = MockPool()
  371. backend = MockBackend()
  372. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  373. kwargs={})
  374. task = TaskRequest.from_message(m, m.decode())
  375. worker.process_task(task)
  376. worker.pool.stop()
  377. def test_process_task_raise_base(self):
  378. worker = self.worker
  379. worker.pool = MockPool(raise_base=True)
  380. backend = MockBackend()
  381. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  382. kwargs={})
  383. task = TaskRequest.from_message(m, m.decode())
  384. worker.process_task(task)
  385. worker.pool.stop()
  386. def test_process_task_raise_regular(self):
  387. worker = self.worker
  388. worker.pool = MockPool(raise_regular=True)
  389. backend = MockBackend()
  390. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  391. kwargs={})
  392. task = TaskRequest.from_message(m, m.decode())
  393. worker.process_task(task)
  394. worker.pool.stop()
  395. def test_start__stop(self):
  396. worker = self.worker
  397. w1 = {"started": False}
  398. w2 = {"started": False}
  399. w3 = {"started": False}
  400. w4 = {"started": False}
  401. worker.components = [MockController(w1), MockController(w2),
  402. MockController(w3), MockController(w4)]
  403. worker.start()
  404. for w in (w1, w2, w3, w4):
  405. self.assertTrue(w["started"])
  406. self.assertTrue(worker._running, len(worker.components))
  407. worker.stop()
  408. for component in worker.components:
  409. self.assertTrue(component._stopped)
  410. def test_start__terminate(self):
  411. worker = self.worker
  412. w1 = {"started": False}
  413. w2 = {"started": False}
  414. w3 = {"started": False}
  415. w4 = {"started": False}
  416. worker.components = [MockController(w1), MockController(w2),
  417. MockController(w3), MockController(w4),
  418. MockPool()]
  419. worker.start()
  420. for w in (w1, w2, w3, w4):
  421. self.assertTrue(w["started"])
  422. self.assertTrue(worker._running, len(worker.components))
  423. self.assertEqual(worker._state, RUN)
  424. worker.terminate()
  425. for component in worker.components:
  426. self.assertTrue(component._stopped)
  427. self.assertTrue(worker.components[4]._terminated)