test_worker.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. import socket
  2. from celery.tests.utils import unittest
  3. from datetime import datetime, timedelta
  4. from Queue import Empty
  5. from kombu.transport.base import Message
  6. from kombu.connection import BrokerConnection
  7. from celery.utils.timer2 import Timer
  8. from celery.app import app_or_default
  9. from celery.concurrency.base import BasePool
  10. from celery.task import task as task_dec
  11. from celery.task import periodic_task as periodic_task_dec
  12. from celery.utils import gen_unique_id
  13. from celery.worker import WorkController
  14. from celery.worker.buckets import FastQueue
  15. from celery.worker.job import TaskRequest
  16. from celery.worker.consumer import Consumer as MainConsumer
  17. from celery.worker.consumer import QoS, RUN
  18. from celery.utils.serialization import pickle
  19. from celery.tests.compat import catch_warnings
  20. from celery.tests.utils import execute_context
  21. class MockConsumer(object):
  22. class Channel(object):
  23. def close(self):
  24. pass
  25. def register_callback(self, cb):
  26. pass
  27. def consume(self):
  28. pass
  29. @property
  30. def channel(self):
  31. return self.Channel()
  32. class PlaceHolder(object):
  33. pass
  34. class MyKombuConsumer(MainConsumer):
  35. broadcast_consumer = MockConsumer()
  36. task_consumer = MockConsumer()
  37. def restart_heartbeat(self):
  38. self.heart = None
  39. class MockNode(object):
  40. commands = []
  41. def handle_message(self, message_data, message):
  42. self.commands.append(message.pop("command", None))
  43. class MockEventDispatcher(object):
  44. sent = []
  45. closed = False
  46. flushed = False
  47. def send(self, event, *args, **kwargs):
  48. self.sent.append(event)
  49. def close(self):
  50. self.closed = True
  51. def flush(self):
  52. self.flushed = True
  53. class MockHeart(object):
  54. closed = False
  55. def stop(self):
  56. self.closed = True
  57. @task_dec()
  58. def foo_task(x, y, z, **kwargs):
  59. return x * y * z
  60. @periodic_task_dec(run_every=60)
  61. def foo_periodic_task():
  62. return "foo"
  63. class MockLogger(object):
  64. def __init__(self):
  65. self.logged = []
  66. def critical(self, msg, *args, **kwargs):
  67. self.logged.append(msg)
  68. def info(self, msg, *args, **kwargs):
  69. self.logged.append(msg)
  70. def error(self, msg, *args, **kwargs):
  71. self.logged.append(msg)
  72. def debug(self, msg, *args, **kwargs):
  73. self.logged.append(msg)
  74. class MockBackend(object):
  75. _acked = False
  76. def basic_ack(self, delivery_tag):
  77. self._acked = True
  78. class MockPool(BasePool):
  79. _terminated = False
  80. _stopped = False
  81. def __init__(self, *args, **kwargs):
  82. self.raise_regular = kwargs.get("raise_regular", False)
  83. self.raise_base = kwargs.get("raise_base", False)
  84. def apply_async(self, *args, **kwargs):
  85. if self.raise_regular:
  86. raise KeyError("some exception")
  87. if self.raise_base:
  88. raise KeyboardInterrupt("Ctrl+c")
  89. def start(self):
  90. pass
  91. def stop(self):
  92. self._stopped = True
  93. return True
  94. def terminate(self):
  95. self._terminated = True
  96. self.stop()
  97. class MockController(object):
  98. def __init__(self, w, *args, **kwargs):
  99. self._w = w
  100. self._stopped = False
  101. def start(self):
  102. self._w["started"] = True
  103. self._stopped = False
  104. def stop(self):
  105. self._stopped = True
  106. def create_message(backend, **data):
  107. data.setdefault("id", gen_unique_id())
  108. return Message(backend, body=pickle.dumps(dict(**data)),
  109. content_type="application/x-python-serialize",
  110. content_encoding="binary")
  111. class test_QoS(unittest.TestCase):
  112. class MockConsumer(object):
  113. prefetch_count = 0
  114. def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
  115. self.prefetch_count = prefetch_count
  116. def test_decrement(self):
  117. consumer = self.MockConsumer()
  118. qos = QoS(consumer, 10, app_or_default().log.get_default_logger())
  119. qos.update()
  120. self.assertEqual(int(qos.value), 10)
  121. self.assertEqual(consumer.prefetch_count, 10)
  122. qos.decrement()
  123. self.assertEqual(int(qos.value), 9)
  124. self.assertEqual(consumer.prefetch_count, 9)
  125. qos.decrement_eventually()
  126. self.assertEqual(int(qos.value), 8)
  127. self.assertEqual(consumer.prefetch_count, 9)
  128. class test_Consumer(unittest.TestCase):
  129. def setUp(self):
  130. self.ready_queue = FastQueue()
  131. self.eta_schedule = Timer()
  132. self.logger = app_or_default().log.get_default_logger()
  133. self.logger.setLevel(0)
  134. def tearDown(self):
  135. self.eta_schedule.stop()
  136. def test_connection(self):
  137. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  138. send_events=False)
  139. l.reset_connection()
  140. self.assertIsInstance(l.connection, BrokerConnection)
  141. l.stop_consumers()
  142. self.assertIsNone(l.connection)
  143. self.assertIsNone(l.task_consumer)
  144. l.reset_connection()
  145. self.assertIsInstance(l.connection, BrokerConnection)
  146. l.stop_consumers()
  147. l.stop()
  148. l.close_connection()
  149. self.assertIsNone(l.connection)
  150. self.assertIsNone(l.task_consumer)
  151. def test_close_connection(self):
  152. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  153. send_events=False)
  154. l._state = RUN
  155. l.close_connection()
  156. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  157. send_events=False)
  158. eventer = l.event_dispatcher = MockEventDispatcher()
  159. heart = l.heart = MockHeart()
  160. l._state = RUN
  161. l.stop_consumers()
  162. self.assertTrue(eventer.closed)
  163. self.assertTrue(heart.closed)
  164. def test_receive_message_unknown(self):
  165. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  166. send_events=False)
  167. backend = MockBackend()
  168. m = create_message(backend, unknown={"baz": "!!!"})
  169. l.event_dispatcher = MockEventDispatcher()
  170. l.pidbox_node = MockNode()
  171. def with_catch_warnings(log):
  172. l.receive_message(m.decode(), m)
  173. self.assertTrue(log)
  174. self.assertIn("unknown message", log[0].message.args[0])
  175. context = catch_warnings(record=True)
  176. execute_context(context, with_catch_warnings)
  177. def test_receive_message_eta_OverflowError(self):
  178. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  179. send_events=False)
  180. backend = MockBackend()
  181. called = [False]
  182. def to_timestamp(d):
  183. called[0] = True
  184. raise OverflowError()
  185. m = create_message(backend, task=foo_task.name,
  186. args=("2, 2"),
  187. kwargs={},
  188. eta=datetime.now().isoformat())
  189. l.event_dispatcher = MockEventDispatcher()
  190. l.pidbox_node = MockNode()
  191. from celery.worker import consumer
  192. prev, consumer.to_timestamp = consumer.to_timestamp, to_timestamp
  193. try:
  194. l.receive_message(m.decode(), m)
  195. self.assertTrue(m.acknowledged)
  196. self.assertTrue(called[0])
  197. finally:
  198. consumer.to_timestamp = prev
  199. def test_receive_message_InvalidTaskError(self):
  200. logger = MockLogger()
  201. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  202. send_events=False)
  203. backend = MockBackend()
  204. m = create_message(backend, task=foo_task.name,
  205. args=(1, 2), kwargs="foobarbaz", id=1)
  206. l.event_dispatcher = MockEventDispatcher()
  207. l.pidbox_node = MockNode()
  208. l.receive_message(m.decode(), m)
  209. self.assertIn("Invalid task ignored", logger.logged[0])
  210. def test_on_decode_error(self):
  211. logger = MockLogger()
  212. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, logger,
  213. send_events=False)
  214. class MockMessage(object):
  215. content_type = "application/x-msgpack"
  216. content_encoding = "binary"
  217. body = "foobarbaz"
  218. acked = False
  219. def ack(self):
  220. self.acked = True
  221. message = MockMessage()
  222. l.on_decode_error(message, KeyError("foo"))
  223. self.assertTrue(message.acked)
  224. self.assertIn("Message decoding error", logger.logged[0])
  225. def test_receieve_message(self):
  226. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  227. send_events=False)
  228. backend = MockBackend()
  229. m = create_message(backend, task=foo_task.name,
  230. args=[2, 4, 8], kwargs={})
  231. l.event_dispatcher = MockEventDispatcher()
  232. l.receive_message(m.decode(), m)
  233. in_bucket = self.ready_queue.get_nowait()
  234. self.assertIsInstance(in_bucket, TaskRequest)
  235. self.assertEqual(in_bucket.task_name, foo_task.name)
  236. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  237. self.assertTrue(self.eta_schedule.empty())
  238. def test_receieve_message_eta_isoformat(self):
  239. class MockConsumer(object):
  240. prefetch_count_incremented = False
  241. def qos(self, **kwargs):
  242. self.prefetch_count_incremented = True
  243. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  244. send_events=False)
  245. backend = MockBackend()
  246. m = create_message(backend, task=foo_task.name,
  247. eta=datetime.now().isoformat(),
  248. args=[2, 4, 8], kwargs={})
  249. l.task_consumer = MockConsumer()
  250. l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
  251. l.event_dispatcher = MockEventDispatcher()
  252. l.receive_message(m.decode(), m)
  253. l.eta_schedule.stop()
  254. items = [entry[2] for entry in self.eta_schedule.queue]
  255. found = 0
  256. for item in items:
  257. if item.args[0].task_name == foo_task.name:
  258. found = True
  259. self.assertTrue(found)
  260. self.assertTrue(l.task_consumer.prefetch_count_incremented)
  261. l.eta_schedule.stop()
  262. def test_revoke(self):
  263. ready_queue = FastQueue()
  264. l = MyKombuConsumer(ready_queue, self.eta_schedule, self.logger,
  265. send_events=False)
  266. backend = MockBackend()
  267. id = gen_unique_id()
  268. t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
  269. kwargs={}, id=id)
  270. from celery.worker.state import revoked
  271. revoked.add(id)
  272. l.receive_message(t.decode(), t)
  273. self.assertTrue(ready_queue.empty())
  274. def test_receieve_message_not_registered(self):
  275. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  276. send_events=False)
  277. backend = MockBackend()
  278. m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
  279. l.event_dispatcher = MockEventDispatcher()
  280. self.assertFalse(l.receive_message(m.decode(), m))
  281. self.assertRaises(Empty, self.ready_queue.get_nowait)
  282. self.assertTrue(self.eta_schedule.empty())
  283. def test_receieve_message_eta(self):
  284. l = MyKombuConsumer(self.ready_queue, self.eta_schedule, self.logger,
  285. send_events=False)
  286. dispatcher = l.event_dispatcher = MockEventDispatcher()
  287. backend = MockBackend()
  288. m = create_message(backend, task=foo_task.name,
  289. args=[2, 4, 8], kwargs={},
  290. eta=(datetime.now() +
  291. timedelta(days=1)).isoformat())
  292. l.reset_connection()
  293. p = l.app.conf.BROKER_CONNECTION_RETRY
  294. l.app.conf.BROKER_CONNECTION_RETRY = False
  295. try:
  296. l.reset_connection()
  297. finally:
  298. l.app.conf.BROKER_CONNECTION_RETRY = p
  299. l.stop_consumers()
  300. self.assertTrue(dispatcher.flushed)
  301. l.event_dispatcher = MockEventDispatcher()
  302. l.receive_message(m.decode(), m)
  303. l.eta_schedule.stop()
  304. in_hold = self.eta_schedule.queue[0]
  305. self.assertEqual(len(in_hold), 3)
  306. eta, priority, entry = in_hold
  307. task = entry.args[0]
  308. self.assertIsInstance(task, TaskRequest)
  309. self.assertEqual(task.task_name, foo_task.name)
  310. self.assertEqual(task.execute(), 2 * 4 * 8)
  311. self.assertRaises(Empty, self.ready_queue.get_nowait)
  312. def test_start__consume_messages(self):
  313. class _QoS(object):
  314. prev = 3
  315. next = 4
  316. def update(self):
  317. self.prev = self.next
  318. class _Consumer(MyKombuConsumer):
  319. iterations = 0
  320. wait_method = None
  321. def reset_connection(self):
  322. if self.iterations >= 1:
  323. raise KeyError("foo")
  324. called_back = [False]
  325. def init_callback(consumer):
  326. called_back[0] = True
  327. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  328. send_events=False, init_callback=init_callback)
  329. l.task_consumer = MockConsumer()
  330. l.broadcast_consumer = MockConsumer()
  331. l.qos = _QoS()
  332. l.connection = BrokerConnection()
  333. l.iterations = 0
  334. def raises_KeyError(limit=None):
  335. l.iterations += 1
  336. if l.qos.prev != l.qos.next:
  337. l.qos.update()
  338. if l.iterations >= 2:
  339. raise KeyError("foo")
  340. l.consume_messages = raises_KeyError
  341. self.assertRaises(KeyError, l.start)
  342. self.assertTrue(called_back[0])
  343. self.assertEqual(l.iterations, 1)
  344. self.assertEqual(l.qos.prev, l.qos.next)
  345. l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
  346. send_events=False, init_callback=init_callback)
  347. l.qos = _QoS()
  348. l.task_consumer = MockConsumer()
  349. l.broadcast_consumer = MockConsumer()
  350. l.connection = BrokerConnection()
  351. def raises_socket_error(limit=None):
  352. l.iterations = 1
  353. raise socket.error("foo")
  354. l.consume_messages = raises_socket_error
  355. self.assertRaises(socket.error, l.start)
  356. self.assertTrue(called_back[0])
  357. self.assertEqual(l.iterations, 1)
  358. class test_WorkController(unittest.TestCase):
  359. def setUp(self):
  360. self.worker = WorkController(concurrency=1, loglevel=0)
  361. self.worker.logger = MockLogger()
  362. def test_with_rate_limits_disabled(self):
  363. worker = WorkController(concurrency=1, loglevel=0,
  364. disable_rate_limits=True)
  365. self.assertTrue(hasattr(worker.ready_queue, "put"))
  366. def test_attrs(self):
  367. worker = self.worker
  368. self.assertIsInstance(worker.scheduler, Timer)
  369. self.assertTrue(worker.scheduler)
  370. self.assertTrue(worker.pool)
  371. self.assertTrue(worker.consumer)
  372. self.assertTrue(worker.mediator)
  373. self.assertTrue(worker.components)
  374. def test_with_embedded_celerybeat(self):
  375. worker = WorkController(concurrency=1, loglevel=0,
  376. embed_clockservice=True)
  377. self.assertTrue(worker.beat)
  378. self.assertIn(worker.beat, worker.components)
  379. def test_process_task(self):
  380. worker = self.worker
  381. worker.pool = MockPool()
  382. backend = MockBackend()
  383. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  384. kwargs={})
  385. task = TaskRequest.from_message(m, m.decode())
  386. worker.process_task(task)
  387. worker.pool.stop()
  388. def test_process_task_raise_base(self):
  389. worker = self.worker
  390. worker.pool = MockPool(raise_base=True)
  391. backend = MockBackend()
  392. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  393. kwargs={})
  394. task = TaskRequest.from_message(m, m.decode())
  395. worker.components = []
  396. worker._state = worker.RUN
  397. self.assertRaises(KeyboardInterrupt, worker.process_task, task)
  398. self.assertEqual(worker._state, worker.TERMINATE)
  399. def test_process_task_raise_regular(self):
  400. worker = self.worker
  401. worker.pool = MockPool(raise_regular=True)
  402. backend = MockBackend()
  403. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  404. kwargs={})
  405. task = TaskRequest.from_message(m, m.decode())
  406. worker.process_task(task)
  407. worker.pool.stop()
  408. def test_start__stop(self):
  409. worker = self.worker
  410. w1 = {"started": False}
  411. w2 = {"started": False}
  412. w3 = {"started": False}
  413. w4 = {"started": False}
  414. worker.components = [MockController(w1), MockController(w2),
  415. MockController(w3), MockController(w4)]
  416. worker.start()
  417. for w in (w1, w2, w3, w4):
  418. self.assertTrue(w["started"])
  419. self.assertTrue(worker._running, len(worker.components))
  420. worker.stop()
  421. for component in worker.components:
  422. self.assertTrue(component._stopped)
  423. def test_start__terminate(self):
  424. worker = self.worker
  425. w1 = {"started": False}
  426. w2 = {"started": False}
  427. w3 = {"started": False}
  428. w4 = {"started": False}
  429. worker.components = [MockController(w1), MockController(w2),
  430. MockController(w3), MockController(w4),
  431. MockPool()]
  432. worker.start()
  433. for w in (w1, w2, w3, w4):
  434. self.assertTrue(w["started"])
  435. self.assertTrue(worker._running, len(worker.components))
  436. self.assertEqual(worker._state, RUN)
  437. worker.terminate()
  438. for component in worker.components:
  439. self.assertTrue(component._stopped)
  440. self.assertTrue(worker.components[4]._terminated)