test_worker.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. import socket
  2. import unittest2 as unittest
  3. from datetime import datetime, timedelta
  4. from Queue import Empty
  5. from kombu.transport.base import Message
  6. from kombu.connection import BrokerConnection
  7. from celery.utils.timer2 import Timer
  8. from celery.app import app_or_default
  9. from celery.decorators import task as task_dec
  10. from celery.decorators import periodic_task as periodic_task_dec
  11. from celery.serialization import pickle
  12. from celery.utils import gen_unique_id
  13. from celery.worker import WorkController
  14. from celery.worker.buckets import FastQueue
  15. from celery.worker.job import TaskRequest
  16. from celery.worker import consumer
  17. from celery.worker.consumer import Consumer as MainConsumer
  18. from celery.worker.consumer import QoS, RUN
  19. from celery.tests.compat import catch_warnings
  20. from celery.tests.utils import execute_context
  21. class MockConsumer(object):
  22. class Channel(object):
  23. def close(self):
  24. pass
  25. def register_callback(self, cb):
  26. pass
  27. def consume(self):
  28. pass
  29. @property
  30. def channel(self):
  31. return self.Channel()
  32. class PlaceHolder(object):
  33. pass
  34. class MyKombuConsumer(MainConsumer):
  35. broadcast_consumer = MockConsumer()
  36. task_consumer = MockConsumer()
  37. def restart_heartbeat(self):
  38. self.heart = None
  39. class MockNode(object):
  40. commands = []
  41. def handle_message(self, message_data, message):
  42. self.commands.append(message.pop("command", None))
  43. class MockEventDispatcher(object):
  44. sent = []
  45. closed = False
  46. flushed = False
  47. def send(self, event, *args, **kwargs):
  48. self.sent.append(event)
  49. def close(self):
  50. self.closed = True
  51. def flush(self):
  52. self.flushed = True
  53. class MockHeart(object):
  54. closed = False
  55. def stop(self):
  56. self.closed = True
  57. @task_dec()
  58. def foo_task(x, y, z, **kwargs):
  59. return x * y * z
  60. @periodic_task_dec(run_every=60)
  61. def foo_periodic_task():
  62. return "foo"
  63. class MockLogger(object):
  64. def __init__(self):
  65. self.logged = []
  66. def critical(self, msg, *args, **kwargs):
  67. self.logged.append(msg)
  68. def info(self, msg, *args, **kwargs):
  69. self.logged.append(msg)
  70. def error(self, msg, *args, **kwargs):
  71. self.logged.append(msg)
  72. def debug(self, msg, *args, **kwargs):
  73. self.logged.append(msg)
  74. class MockBackend(object):
  75. _acked = False
  76. def basic_ack(self, delivery_tag):
  77. self._acked = True
  78. class MockPool(object):
  79. _terminated = False
  80. _stopped = False
  81. def __init__(self, *args, **kwargs):
  82. self.raise_regular = kwargs.get("raise_regular", False)
  83. self.raise_base = kwargs.get("raise_base", False)
  84. def apply_async(self, *args, **kwargs):
  85. if self.raise_regular:
  86. raise KeyError("some exception")
  87. if self.raise_base:
  88. raise KeyboardInterrupt("Ctrl+c")
  89. def start(self):
  90. pass
  91. def stop(self):
  92. self._stopped = True
  93. return True
  94. def terminate(self):
  95. self._terminated = True
  96. self.stop()
  97. class MockController(object):
  98. def __init__(self, w, *args, **kwargs):
  99. self._w = w
  100. self._stopped = False
  101. def start(self):
  102. self._w["started"] = True
  103. self._stopped = False
  104. def stop(self):
  105. self._stopped = True
  106. def create_message(backend, **data):
  107. data.setdefault("id", gen_unique_id())
  108. return Message(backend, body=pickle.dumps(dict(**data)),
  109. content_type="application/x-python-serialize",
  110. content_encoding="binary")
  111. class test_QoS(unittest.TestCase):
  112. class MockConsumer(object):
  113. prefetch_count = 0
  114. def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
  115. self.prefetch_count = prefetch_count
  116. def test_decrement(self):
  117. consumer = self.MockConsumer()
  118. qos = QoS(consumer, 10, app_or_default().log.get_default_logger())
  119. qos.update()
  120. self.assertEqual(int(qos.value), 10)
  121. self.assertEqual(consumer.prefetch_count, 10)
  122. qos.decrement()
  123. self.assertEqual(int(qos.value), 9)
  124. self.assertEqual(consumer.prefetch_count, 9)
  125. qos.decrement_eventually()
  126. self.assertEqual(int(qos.value), 8)
  127. self.assertEqual(consumer.prefetch_count, 9)
  128. class test_Consumer(unittest.TestCase):
  129. def setUp(self):
  130. self.ready_queue = FastQueue()
  131. self.eta_schedule = Timer()
  132. self.logger = app_or_default().log.get_default_logger()
  133. self.logger.setLevel(0)
  134. def tearDown(self):
  135. self.eta_schedule.stop()
  136. def test_connection(self):
  137. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  138. send_events=False)
  139. l.reset_connection()
  140. self.assertIsInstance(l.connection, BrokerConnection)
  141. l.stop_consumers()
  142. self.assertIsNone(l.connection)
  143. self.assertIsNone(l.task_consumer)
  144. l.reset_connection()
  145. self.assertIsInstance(l.connection, BrokerConnection)
  146. l.stop_consumers()
  147. l.stop()
  148. l.close_connection()
  149. self.assertIsNone(l.connection)
  150. self.assertIsNone(l.task_consumer)
  151. def test_close_connection(self):
  152. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  153. send_events=False)
  154. l._state = RUN
  155. l.close_connection()
  156. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  157. send_events=False)
  158. eventer = l.event_dispatcher = MockEventDispatcher()
  159. heart = l.heart = MockHeart()
  160. l._state = RUN
  161. l.stop_consumers()
  162. self.assertTrue(eventer.closed)
  163. self.assertTrue(heart.closed)
  164. def test_receive_message_unknown(self):
  165. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  166. send_events=False)
  167. backend = MockBackend()
  168. m = create_message(backend, unknown={"baz": "!!!"})
  169. l.event_dispatcher = MockEventDispatcher()
  170. l.pidbox_node = MockNode()
  171. def with_catch_warnings(log):
  172. l.receive_message(m.decode(), m)
  173. self.assertTrue(log)
  174. self.assertIn("unknown message", log[0].message.args[0])
  175. context = catch_warnings(record=True)
  176. execute_context(context, with_catch_warnings)
  177. def test_receive_message_eta_OverflowError(self):
  178. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  179. send_events=False)
  180. backend = MockBackend()
  181. called = [False]
  182. def to_timestamp(d):
  183. called[0] = True
  184. raise OverflowError()
  185. m = create_message(backend, task=foo_task.name,
  186. args=("2, 2"),
  187. kwargs={},
  188. eta=datetime.now().isoformat())
  189. l.event_dispatcher = MockEventDispatcher()
  190. l.pidbox_node = MockNode()
  191. prev, consumer.to_timestamp = consumer.to_timestamp, to_timestamp
  192. try:
  193. l.receive_message(m.decode(), m)
  194. self.assertTrue(m.acknowledged)
  195. self.assertTrue(called[0])
  196. finally:
  197. consumer.to_timestamp = prev
  198. def test_receive_message_InvalidTaskError(self):
  199. logger = MockLogger()
  200. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  201. send_events=False)
  202. backend = MockBackend()
  203. m = create_message(backend, task=foo_task.name,
  204. args=(1, 2), kwargs="foobarbaz", id=1)
  205. l.event_dispatcher = MockEventDispatcher()
  206. l.pidbox_node = MockNode()
  207. l.receive_message(m.decode(), m)
  208. self.assertIn("Invalid task ignored", logger.logged[0])
  209. def test_on_decode_error(self):
  210. logger = MockLogger()
  211. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  212. send_events=False)
  213. class MockMessage(object):
  214. content_type = "application/x-msgpack"
  215. content_encoding = "binary"
  216. body = "foobarbaz"
  217. acked = False
  218. def ack(self):
  219. self.acked = True
  220. message = MockMessage()
  221. l.on_decode_error(message, KeyError("foo"))
  222. self.assertTrue(message.acked)
  223. self.assertIn("Message decoding error", logger.logged[0])
  224. def test_receieve_message(self):
  225. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  226. send_events=False)
  227. backend = MockBackend()
  228. m = create_message(backend, task=foo_task.name,
  229. args=[2, 4, 8], kwargs={})
  230. l.event_dispatcher = MockEventDispatcher()
  231. l.receive_message(m.decode(), m)
  232. in_bucket = self.ready_queue.get_nowait()
  233. self.assertIsInstance(in_bucket, TaskRequest)
  234. self.assertEqual(in_bucket.task_name, foo_task.name)
  235. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  236. self.assertTrue(self.eta_schedule.empty())
  237. def test_receieve_message_eta_isoformat(self):
  238. class MockConsumer(object):
  239. prefetch_count_incremented = False
  240. def qos(self, **kwargs):
  241. self.prefetch_count_incremented = True
  242. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  243. send_events=False)
  244. backend = MockBackend()
  245. m = create_message(backend, task=foo_task.name,
  246. eta=datetime.now().isoformat(),
  247. args=[2, 4, 8], kwargs={})
  248. l.task_consumer = MockConsumer()
  249. l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
  250. l.event_dispatcher = MockEventDispatcher()
  251. l.receive_message(m.decode(), m)
  252. l.eta_schedule.stop()
  253. items = [entry[2] for entry in self.eta_schedule.queue]
  254. found = 0
  255. for item in items:
  256. if item.args[0].task_name == foo_task.name:
  257. found = True
  258. self.assertTrue(found)
  259. self.assertTrue(l.task_consumer.prefetch_count_incremented)
  260. l.eta_schedule.stop()
  261. def test_revoke(self):
  262. ready_queue = FastQueue()
  263. l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
  264. send_events=False)
  265. backend = MockBackend()
  266. id = gen_unique_id()
  267. t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
  268. kwargs={}, id=id)
  269. from celery.worker.state import revoked
  270. revoked.add(id)
  271. l.receive_message(t.decode(), t)
  272. self.assertTrue(ready_queue.empty())
  273. def test_receieve_message_not_registered(self):
  274. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  275. send_events=False)
  276. backend = MockBackend()
  277. m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
  278. l.event_dispatcher = MockEventDispatcher()
  279. self.assertFalse(l.receive_message(m.decode(), m))
  280. self.assertRaises(Empty, self.ready_queue.get_nowait)
  281. self.assertTrue(self.eta_schedule.empty())
  282. def test_receieve_message_eta(self):
  283. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  284. send_events=False)
  285. dispatcher = l.event_dispatcher = MockEventDispatcher()
  286. backend = MockBackend()
  287. m = create_message(backend, task=foo_task.name,
  288. args=[2, 4, 8], kwargs={},
  289. eta=(datetime.now() +
  290. timedelta(days=1)).isoformat())
  291. l.reset_connection()
  292. p = l.app.conf.BROKER_CONNECTION_RETRY
  293. l.app.conf.BROKER_CONNECTION_RETRY = False
  294. try:
  295. l.reset_connection()
  296. finally:
  297. l.app.conf.BROKER_CONNECTION_RETRY = p
  298. l.stop_consumers()
  299. self.assertTrue(dispatcher.flushed)
  300. l.event_dispatcher = MockEventDispatcher()
  301. l.receive_message(m.decode(), m)
  302. l.eta_schedule.stop()
  303. in_hold = self.eta_schedule.queue[0]
  304. self.assertEqual(len(in_hold), 3)
  305. eta, priority, entry = in_hold
  306. task = entry.args[0]
  307. self.assertIsInstance(task, TaskRequest)
  308. self.assertEqual(task.task_name, foo_task.name)
  309. self.assertEqual(task.execute(), 2 * 4 * 8)
  310. self.assertRaises(Empty, self.ready_queue.get_nowait)
  311. def test_start__consume_messages(self):
  312. class _QoS(object):
  313. prev = 3
  314. next = 4
  315. def update(self):
  316. self.prev = self.next
  317. class _Consumer(MyKombuConsumer):
  318. iterations = 0
  319. wait_method = None
  320. def reset_connection(self):
  321. if self.iterations >= 1:
  322. raise KeyError("foo")
  323. called_back = [False]
  324. def init_callback(consumer):
  325. called_back[0] = True
  326. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  327. send_events=False, init_callback=init_callback)
  328. l.task_consumer = MockConsumer()
  329. l.qos = _QoS()
  330. l.connection = BrokerConnection()
  331. def raises_KeyError(limit=None):
  332. yield True
  333. l.iterations = 1
  334. raise KeyError("foo")
  335. l._mainloop = raises_KeyError
  336. self.assertRaises(KeyError, l.start)
  337. self.assertTrue(called_back[0])
  338. self.assertEqual(l.iterations, 1)
  339. self.assertEqual(l.qos.prev, l.qos.next)
  340. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  341. send_events=False, init_callback=init_callback)
  342. l.qos = _QoS()
  343. l.task_consumer = MockConsumer()
  344. l.connection = BrokerConnection()
  345. def raises_socket_error(limit=None):
  346. yield True
  347. l.iterations = 1
  348. raise socket.error("foo")
  349. l._mainloop = raises_socket_error
  350. self.assertRaises(socket.error, l.start)
  351. self.assertTrue(called_back[0])
  352. self.assertEqual(l.iterations, 1)
  353. class test_WorkController(unittest.TestCase):
  354. def setUp(self):
  355. self.worker = WorkController(concurrency=1, loglevel=0)
  356. self.worker.logger = MockLogger()
  357. def test_with_rate_limits_disabled(self):
  358. worker = WorkController(concurrency=1, loglevel=0,
  359. disable_rate_limits=True)
  360. self.assertTrue(hasattr(worker.ready_queue, "put"))
  361. def test_attrs(self):
  362. worker = self.worker
  363. self.assertIsInstance(worker.scheduler, Timer)
  364. self.assertTrue(worker.scheduler)
  365. self.assertTrue(worker.pool)
  366. self.assertTrue(worker.consumer)
  367. self.assertTrue(worker.mediator)
  368. self.assertTrue(worker.components)
  369. def test_with_embedded_celerybeat(self):
  370. worker = WorkController(concurrency=1, loglevel=0,
  371. embed_clockservice=True)
  372. self.assertTrue(worker.beat)
  373. self.assertIn(worker.beat, worker.components)
  374. def test_process_task(self):
  375. worker = self.worker
  376. worker.pool = MockPool()
  377. backend = MockBackend()
  378. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  379. kwargs={})
  380. task = TaskRequest.from_message(m, m.decode())
  381. worker.process_task(task)
  382. worker.pool.stop()
  383. def test_process_task_raise_base(self):
  384. worker = self.worker
  385. worker.pool = MockPool(raise_base=True)
  386. backend = MockBackend()
  387. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  388. kwargs={})
  389. task = TaskRequest.from_message(m, m.decode())
  390. worker.process_task(task)
  391. worker.pool.stop()
  392. def test_process_task_raise_regular(self):
  393. worker = self.worker
  394. worker.pool = MockPool(raise_regular=True)
  395. backend = MockBackend()
  396. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  397. kwargs={})
  398. task = TaskRequest.from_message(m, m.decode())
  399. worker.process_task(task)
  400. worker.pool.stop()
  401. def test_start__stop(self):
  402. worker = self.worker
  403. w1 = {"started": False}
  404. w2 = {"started": False}
  405. w3 = {"started": False}
  406. w4 = {"started": False}
  407. worker.components = [MockController(w1), MockController(w2),
  408. MockController(w3), MockController(w4)]
  409. worker.start()
  410. for w in (w1, w2, w3, w4):
  411. self.assertTrue(w["started"])
  412. self.assertTrue(worker._running, len(worker.components))
  413. worker.stop()
  414. for component in worker.components:
  415. self.assertTrue(component._stopped)
  416. def test_start__terminate(self):
  417. worker = self.worker
  418. w1 = {"started": False}
  419. w2 = {"started": False}
  420. w3 = {"started": False}
  421. w4 = {"started": False}
  422. worker.components = [MockController(w1), MockController(w2),
  423. MockController(w3), MockController(w4),
  424. MockPool()]
  425. worker.start()
  426. for w in (w1, w2, w3, w4):
  427. self.assertTrue(w["started"])
  428. self.assertTrue(worker._running, len(worker.components))
  429. self.assertEqual(worker._state, RUN)
  430. worker.terminate()
  431. for component in worker.components:
  432. self.assertTrue(component._stopped)
  433. self.assertTrue(worker.components[4]._terminated)