test_worker.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import socket
  4. from collections import deque
  5. from datetime import datetime, timedelta
  6. from Queue import Empty
  7. from billiard.exceptions import WorkerLostError
  8. from kombu.exceptions import StdChannelError
  9. from kombu.transport.base import Message
  10. from kombu.connection import BrokerConnection
  11. from mock import Mock, patch
  12. from nose import SkipTest
  13. from celery import current_app
  14. from celery.app.defaults import DEFAULTS
  15. from celery.concurrency.base import BasePool
  16. from celery.datastructures import AttributeDict
  17. from celery.exceptions import SystemTerminate
  18. from celery.task import task as task_dec
  19. from celery.task import periodic_task as periodic_task_dec
  20. from celery.utils import uuid
  21. from celery.worker import WorkController, Queues, Timers, EvLoop, Pool
  22. from celery.worker.buckets import FastQueue
  23. from celery.worker.job import Request
  24. from celery.worker.consumer import BlockingConsumer
  25. from celery.worker.consumer import QoS, RUN, PREFETCH_COUNT_MAX, CLOSE
  26. from celery.utils.serialization import pickle
  27. from celery.utils.timer2 import Timer
  28. from celery.utils.threads import Event
  29. from celery.tests.utils import AppCase, Case
  30. class PlaceHolder(object):
  31. pass
  32. class MyKombuConsumer(BlockingConsumer):
  33. broadcast_consumer = Mock()
  34. task_consumer = Mock()
  35. def __init__(self, *args, **kwargs):
  36. kwargs.setdefault("pool", BasePool(2))
  37. super(MyKombuConsumer, self).__init__(*args, **kwargs)
  38. def restart_heartbeat(self):
  39. self.heart = None
  40. class MockNode(object):
  41. commands = []
  42. def handle_message(self, body, message):
  43. self.commands.append(body.pop("command", None))
  44. class MockEventDispatcher(object):
  45. sent = []
  46. closed = False
  47. flushed = False
  48. _outbound_buffer = []
  49. def send(self, event, *args, **kwargs):
  50. self.sent.append(event)
  51. def close(self):
  52. self.closed = True
  53. def flush(self):
  54. self.flushed = True
  55. class MockHeart(object):
  56. closed = False
  57. def stop(self):
  58. self.closed = True
  59. @task_dec()
  60. def foo_task(x, y, z, **kwargs):
  61. return x * y * z
  62. @periodic_task_dec(run_every=60)
  63. def foo_periodic_task():
  64. return "foo"
  65. def create_message(channel, **data):
  66. data.setdefault("id", uuid())
  67. channel.no_ack_consumers = set()
  68. return Message(channel, body=pickle.dumps(dict(**data)),
  69. content_type="application/x-python-serialize",
  70. content_encoding="binary",
  71. delivery_info={"consumer_tag": "mock"})
  72. class test_QoS(Case):
  73. class _QoS(QoS):
  74. def __init__(self, value):
  75. self.value = value
  76. QoS.__init__(self, None, value)
  77. def set(self, value):
  78. return value
  79. def test_qos_increment_decrement(self):
  80. qos = self._QoS(10)
  81. self.assertEqual(qos.increment(), 11)
  82. self.assertEqual(qos.increment(3), 14)
  83. self.assertEqual(qos.increment(-30), 14)
  84. self.assertEqual(qos.decrement(7), 7)
  85. self.assertEqual(qos.decrement(), 6)
  86. with self.assertRaises(AssertionError):
  87. qos.decrement(10)
  88. def test_qos_disabled_increment_decrement(self):
  89. qos = self._QoS(0)
  90. self.assertEqual(qos.increment(), 0)
  91. self.assertEqual(qos.increment(3), 0)
  92. self.assertEqual(qos.increment(-30), 0)
  93. self.assertEqual(qos.decrement(7), 0)
  94. self.assertEqual(qos.decrement(), 0)
  95. self.assertEqual(qos.decrement(10), 0)
  96. def test_qos_thread_safe(self):
  97. qos = self._QoS(10)
  98. def add():
  99. for i in xrange(1000):
  100. qos.increment()
  101. def sub():
  102. for i in xrange(1000):
  103. qos.decrement_eventually()
  104. def threaded(funs):
  105. from threading import Thread
  106. threads = [Thread(target=fun) for fun in funs]
  107. for thread in threads:
  108. thread.start()
  109. for thread in threads:
  110. thread.join()
  111. threaded([add, add])
  112. self.assertEqual(qos.value, 2010)
  113. qos.value = 1000
  114. threaded([add, sub]) # n = 2
  115. self.assertEqual(qos.value, 1000)
  116. def test_exceeds_short(self):
  117. qos = QoS(Mock(), PREFETCH_COUNT_MAX - 1)
  118. qos.update()
  119. self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
  120. qos.increment()
  121. self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
  122. qos.increment()
  123. self.assertEqual(qos.value, PREFETCH_COUNT_MAX + 1)
  124. qos.decrement()
  125. self.assertEqual(qos.value, PREFETCH_COUNT_MAX)
  126. qos.decrement()
  127. self.assertEqual(qos.value, PREFETCH_COUNT_MAX - 1)
  128. def test_consumer_increment_decrement(self):
  129. consumer = Mock()
  130. qos = QoS(consumer, 10)
  131. qos.update()
  132. self.assertEqual(qos.value, 10)
  133. consumer.qos.assert_called_with(prefetch_count=10)
  134. qos.decrement()
  135. self.assertEqual(qos.value, 9)
  136. consumer.qos.assert_called_with(prefetch_count=9)
  137. qos.decrement_eventually()
  138. self.assertEqual(qos.value, 8)
  139. consumer.qos.assert_called_with(prefetch_count=9)
  140. self.assertIn({"prefetch_count": 9}, consumer.qos.call_args)
  141. # Does not decrement 0 value
  142. qos.value = 0
  143. qos.decrement()
  144. self.assertEqual(qos.value, 0)
  145. qos.increment()
  146. self.assertEqual(qos.value, 0)
  147. def test_consumer_decrement_eventually(self):
  148. consumer = Mock()
  149. qos = QoS(consumer, 10)
  150. qos.decrement_eventually()
  151. self.assertEqual(qos.value, 9)
  152. qos.value = 0
  153. qos.decrement_eventually()
  154. self.assertEqual(qos.value, 0)
  155. def test_set(self):
  156. consumer = Mock()
  157. qos = QoS(consumer, 10)
  158. qos.set(12)
  159. self.assertEqual(qos.prev, 12)
  160. qos.set(qos.prev)
  161. class test_Consumer(Case):
  162. def setUp(self):
  163. self.ready_queue = FastQueue()
  164. self.timer = Timer()
  165. def tearDown(self):
  166. self.timer.stop()
  167. def test_info(self):
  168. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  169. l.qos = QoS(l.task_consumer, 10)
  170. info = l.info
  171. self.assertEqual(info["prefetch_count"], 10)
  172. self.assertFalse(info["broker"])
  173. l.connection = current_app.broker_connection()
  174. info = l.info
  175. self.assertTrue(info["broker"])
  176. def test_start_when_closed(self):
  177. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  178. l._state = CLOSE
  179. l.start()
  180. def test_connection(self):
  181. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  182. l.reset_connection()
  183. self.assertIsInstance(l.connection, BrokerConnection)
  184. l._state = RUN
  185. l.event_dispatcher = None
  186. l.stop_consumers(close_connection=False)
  187. self.assertTrue(l.connection)
  188. l._state = RUN
  189. l.stop_consumers()
  190. self.assertIsNone(l.connection)
  191. self.assertIsNone(l.task_consumer)
  192. l.reset_connection()
  193. self.assertIsInstance(l.connection, BrokerConnection)
  194. l.stop_consumers()
  195. l.stop()
  196. l.close_connection()
  197. self.assertIsNone(l.connection)
  198. self.assertIsNone(l.task_consumer)
  199. def test_close_connection(self):
  200. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  201. l._state = RUN
  202. l.close_connection()
  203. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  204. eventer = l.event_dispatcher = Mock()
  205. eventer.enabled = True
  206. heart = l.heart = MockHeart()
  207. l._state = RUN
  208. l.stop_consumers()
  209. self.assertTrue(eventer.close.call_count)
  210. self.assertTrue(heart.closed)
  211. @patch("celery.worker.consumer.warn")
  212. def test_receive_message_unknown(self, warn):
  213. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  214. backend = Mock()
  215. m = create_message(backend, unknown={"baz": "!!!"})
  216. l.event_dispatcher = Mock()
  217. l.pidbox_node = MockNode()
  218. l.receive_message(m.decode(), m)
  219. self.assertTrue(warn.call_count)
  220. @patch("celery.utils.timer2.to_timestamp")
  221. def test_receive_message_eta_OverflowError(self, to_timestamp):
  222. to_timestamp.side_effect = OverflowError()
  223. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  224. m = create_message(Mock(), task=foo_task.name,
  225. args=("2, 2"),
  226. kwargs={},
  227. eta=datetime.now().isoformat())
  228. l.event_dispatcher = Mock()
  229. l.pidbox_node = MockNode()
  230. l.update_strategies()
  231. l.receive_message(m.decode(), m)
  232. self.assertTrue(m.acknowledged)
  233. self.assertTrue(to_timestamp.call_count)
  234. @patch("celery.worker.consumer.error")
  235. def test_receive_message_InvalidTaskError(self, error):
  236. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  237. m = create_message(Mock(), task=foo_task.name,
  238. args=(1, 2), kwargs="foobarbaz", id=1)
  239. l.update_strategies()
  240. l.event_dispatcher = Mock()
  241. l.pidbox_node = MockNode()
  242. l.receive_message(m.decode(), m)
  243. self.assertIn("Received invalid task message", error.call_args[0][0])
  244. @patch("celery.worker.consumer.crit")
  245. def test_on_decode_error(self, crit):
  246. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  247. class MockMessage(Mock):
  248. content_type = "application/x-msgpack"
  249. content_encoding = "binary"
  250. body = "foobarbaz"
  251. message = MockMessage()
  252. l.on_decode_error(message, KeyError("foo"))
  253. self.assertTrue(message.ack.call_count)
  254. self.assertIn("Can't decode message body", crit.call_args[0][0])
  255. def test_receieve_message(self):
  256. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  257. m = create_message(Mock(), task=foo_task.name,
  258. args=[2, 4, 8], kwargs={})
  259. l.update_strategies()
  260. l.event_dispatcher = Mock()
  261. l.receive_message(m.decode(), m)
  262. in_bucket = self.ready_queue.get_nowait()
  263. self.assertIsInstance(in_bucket, Request)
  264. self.assertEqual(in_bucket.name, foo_task.name)
  265. self.assertEqual(in_bucket.execute(), 2 * 4 * 8)
  266. self.assertTrue(self.timer.empty())
  267. def test_start_connection_error(self):
  268. class MockConsumer(BlockingConsumer):
  269. iterations = 0
  270. def consume_messages(self):
  271. if not self.iterations:
  272. self.iterations = 1
  273. raise KeyError("foo")
  274. raise SyntaxError("bar")
  275. l = MockConsumer(self.ready_queue, timer=self.timer,
  276. send_events=False, pool=BasePool())
  277. l.connection_errors = (KeyError, )
  278. with self.assertRaises(SyntaxError):
  279. l.start()
  280. l.heart.stop()
  281. l.timer.stop()
  282. def test_start_channel_error(self):
  283. # Regression test for AMQPChannelExceptions that can occur within the
  284. # consumer. (i.e. 404 errors)
  285. class MockConsumer(BlockingConsumer):
  286. iterations = 0
  287. def consume_messages(self):
  288. if not self.iterations:
  289. self.iterations = 1
  290. raise KeyError("foo")
  291. raise SyntaxError("bar")
  292. l = MockConsumer(self.ready_queue, timer=self.timer,
  293. send_events=False, pool=BasePool())
  294. l.channel_errors = (KeyError, )
  295. self.assertRaises(SyntaxError, l.start)
  296. l.heart.stop()
  297. l.timer.stop()
  298. def test_consume_messages_ignores_socket_timeout(self):
  299. class Connection(current_app.broker_connection().__class__):
  300. obj = None
  301. def drain_events(self, **kwargs):
  302. self.obj.connection = None
  303. raise socket.timeout(10)
  304. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  305. l.connection = Connection()
  306. l.task_consumer = Mock()
  307. l.connection.obj = l
  308. l.qos = QoS(l.task_consumer, 10)
  309. l.consume_messages()
  310. def test_consume_messages_when_socket_error(self):
  311. class Connection(current_app.broker_connection().__class__):
  312. obj = None
  313. def drain_events(self, **kwargs):
  314. self.obj.connection = None
  315. raise socket.error("foo")
  316. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  317. l._state = RUN
  318. c = l.connection = Connection()
  319. l.connection.obj = l
  320. l.task_consumer = Mock()
  321. l.qos = QoS(l.task_consumer, 10)
  322. with self.assertRaises(socket.error):
  323. l.consume_messages()
  324. l._state = CLOSE
  325. l.connection = c
  326. l.consume_messages()
  327. def test_consume_messages(self):
  328. class Connection(current_app.broker_connection().__class__):
  329. obj = None
  330. def drain_events(self, **kwargs):
  331. self.obj.connection = None
  332. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  333. l.connection = Connection()
  334. l.connection.obj = l
  335. l.task_consumer = Mock()
  336. l.qos = QoS(l.task_consumer, 10)
  337. l.consume_messages()
  338. l.consume_messages()
  339. self.assertTrue(l.task_consumer.consume.call_count)
  340. l.task_consumer.qos.assert_called_with(prefetch_count=10)
  341. l.qos.decrement()
  342. l.consume_messages()
  343. l.task_consumer.qos.assert_called_with(prefetch_count=9)
  344. def test_maybe_conn_error(self):
  345. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  346. l.connection_errors = (KeyError, )
  347. l.channel_errors = (SyntaxError, )
  348. l.maybe_conn_error(Mock(side_effect=AttributeError("foo")))
  349. l.maybe_conn_error(Mock(side_effect=KeyError("foo")))
  350. l.maybe_conn_error(Mock(side_effect=SyntaxError("foo")))
  351. with self.assertRaises(IndexError):
  352. l.maybe_conn_error(Mock(side_effect=IndexError("foo")))
  353. def test_apply_eta_task(self):
  354. from celery.worker import state
  355. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  356. l.qos = QoS(None, 10)
  357. task = object()
  358. qos = l.qos.value
  359. l.apply_eta_task(task)
  360. self.assertIn(task, state.reserved_requests)
  361. self.assertEqual(l.qos.value, qos - 1)
  362. self.assertIs(self.ready_queue.get_nowait(), task)
  363. def test_receieve_message_eta_isoformat(self):
  364. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  365. m = create_message(Mock(), task=foo_task.name,
  366. eta=datetime.now().isoformat(),
  367. args=[2, 4, 8], kwargs={})
  368. l.task_consumer = Mock()
  369. l.qos = QoS(l.task_consumer, l.initial_prefetch_count)
  370. l.event_dispatcher = Mock()
  371. l.enabled = False
  372. l.update_strategies()
  373. l.receive_message(m.decode(), m)
  374. l.timer.stop()
  375. items = [entry[2] for entry in self.timer.queue]
  376. found = 0
  377. for item in items:
  378. if item.args[0].name == foo_task.name:
  379. found = True
  380. self.assertTrue(found)
  381. self.assertTrue(l.task_consumer.qos.call_count)
  382. l.timer.stop()
  383. def test_on_control(self):
  384. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  385. l.pidbox_node = Mock()
  386. l.reset_pidbox_node = Mock()
  387. l.on_control("foo", "bar")
  388. l.pidbox_node.handle_message.assert_called_with("foo", "bar")
  389. l.pidbox_node = Mock()
  390. l.pidbox_node.handle_message.side_effect = KeyError("foo")
  391. l.on_control("foo", "bar")
  392. l.pidbox_node.handle_message.assert_called_with("foo", "bar")
  393. l.pidbox_node = Mock()
  394. l.pidbox_node.handle_message.side_effect = ValueError("foo")
  395. l.on_control("foo", "bar")
  396. l.pidbox_node.handle_message.assert_called_with("foo", "bar")
  397. l.reset_pidbox_node.assert_called_with()
  398. def test_revoke(self):
  399. ready_queue = FastQueue()
  400. l = MyKombuConsumer(ready_queue, timer=self.timer)
  401. backend = Mock()
  402. id = uuid()
  403. t = create_message(backend, task=foo_task.name, args=[2, 4, 8],
  404. kwargs={}, id=id)
  405. from celery.worker.state import revoked
  406. revoked.add(id)
  407. l.receive_message(t.decode(), t)
  408. self.assertTrue(ready_queue.empty())
  409. def test_receieve_message_not_registered(self):
  410. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  411. backend = Mock()
  412. m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
  413. l.event_dispatcher = Mock()
  414. self.assertFalse(l.receive_message(m.decode(), m))
  415. with self.assertRaises(Empty):
  416. self.ready_queue.get_nowait()
  417. self.assertTrue(self.timer.empty())
  418. @patch("celery.worker.consumer.warn")
  419. @patch("celery.worker.consumer.logger")
  420. def test_receieve_message_ack_raises(self, logger, warn):
  421. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  422. backend = Mock()
  423. m = create_message(backend, args=[2, 4, 8], kwargs={})
  424. l.event_dispatcher = Mock()
  425. l.connection_errors = (socket.error, )
  426. m.reject = Mock()
  427. m.reject.side_effect = socket.error("foo")
  428. self.assertFalse(l.receive_message(m.decode(), m))
  429. self.assertTrue(warn.call_count)
  430. with self.assertRaises(Empty):
  431. self.ready_queue.get_nowait()
  432. self.assertTrue(self.timer.empty())
  433. m.reject.assert_called_with()
  434. self.assertTrue(logger.critical.call_count)
  435. def test_receieve_message_eta(self):
  436. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  437. l.event_dispatcher = Mock()
  438. l.event_dispatcher._outbound_buffer = deque()
  439. backend = Mock()
  440. m = create_message(backend, task=foo_task.name,
  441. args=[2, 4, 8], kwargs={},
  442. eta=(datetime.now() +
  443. timedelta(days=1)).isoformat())
  444. l.reset_connection()
  445. p = l.app.conf.BROKER_CONNECTION_RETRY
  446. l.app.conf.BROKER_CONNECTION_RETRY = False
  447. try:
  448. l.reset_connection()
  449. finally:
  450. l.app.conf.BROKER_CONNECTION_RETRY = p
  451. l.stop_consumers()
  452. l.event_dispatcher = Mock()
  453. l.receive_message(m.decode(), m)
  454. l.timer.stop()
  455. in_hold = l.timer.queue[0]
  456. self.assertEqual(len(in_hold), 3)
  457. eta, priority, entry = in_hold
  458. task = entry.args[0]
  459. self.assertIsInstance(task, Request)
  460. self.assertEqual(task.name, foo_task.name)
  461. self.assertEqual(task.execute(), 2 * 4 * 8)
  462. with self.assertRaises(Empty):
  463. self.ready_queue.get_nowait()
  464. def test_reset_pidbox_node(self):
  465. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  466. l.pidbox_node = Mock()
  467. chan = l.pidbox_node.channel = Mock()
  468. l.connection = Mock()
  469. chan.close.side_effect = socket.error("foo")
  470. l.connection_errors = (socket.error, )
  471. l.reset_pidbox_node()
  472. chan.close.assert_called_with()
  473. def test_reset_pidbox_node_green(self):
  474. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  475. l.pool = Mock()
  476. l.pool.is_green = True
  477. l.reset_pidbox_node()
  478. l.pool.spawn_n.assert_called_with(l._green_pidbox_node)
  479. def test__green_pidbox_node(self):
  480. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  481. l.pidbox_node = Mock()
  482. class BConsumer(Mock):
  483. def __enter__(self):
  484. self.consume()
  485. return self
  486. def __exit__(self, *exc_info):
  487. self.cancel()
  488. l.pidbox_node.listen = BConsumer()
  489. connections = []
  490. class Connection(object):
  491. calls = 0
  492. def __init__(self, obj):
  493. connections.append(self)
  494. self.obj = obj
  495. self.default_channel = self.channel()
  496. self.closed = False
  497. def __enter__(self):
  498. return self
  499. def __exit__(self, *exc_info):
  500. self.close()
  501. def channel(self):
  502. return Mock()
  503. def drain_events(self, **kwargs):
  504. if not self.calls:
  505. self.calls += 1
  506. raise socket.timeout()
  507. self.obj.connection = None
  508. self.obj._pidbox_node_shutdown.set()
  509. def close(self):
  510. self.closed = True
  511. l.connection = Mock()
  512. l._open_connection = lambda: Connection(obj=l)
  513. l._green_pidbox_node()
  514. l.pidbox_node.listen.assert_called_with(callback=l.on_control)
  515. self.assertTrue(l.broadcast_consumer)
  516. l.broadcast_consumer.consume.assert_called_with()
  517. self.assertIsNone(l.connection)
  518. self.assertTrue(connections[0].closed)
  519. @patch("kombu.connection.BrokerConnection._establish_connection")
  520. @patch("kombu.utils.sleep")
  521. def test_open_connection_errback(self, sleep, connect):
  522. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  523. from kombu.transport.memory import Transport
  524. Transport.connection_errors = (StdChannelError, )
  525. def effect():
  526. if connect.call_count > 1:
  527. return
  528. raise StdChannelError()
  529. connect.side_effect = effect
  530. l._open_connection()
  531. connect.assert_called_with()
  532. def test_stop_pidbox_node(self):
  533. l = MyKombuConsumer(self.ready_queue, timer=self.timer)
  534. l._pidbox_node_stopped = Event()
  535. l._pidbox_node_shutdown = Event()
  536. l._pidbox_node_stopped.set()
  537. l.stop_pidbox_node()
  538. def test_start__consume_messages(self):
  539. class _QoS(object):
  540. prev = 3
  541. value = 4
  542. def update(self):
  543. self.prev = self.value
  544. class _Consumer(MyKombuConsumer):
  545. iterations = 0
  546. def reset_connection(self):
  547. if self.iterations >= 1:
  548. raise KeyError("foo")
  549. init_callback = Mock()
  550. l = _Consumer(self.ready_queue, timer=self.timer,
  551. init_callback=init_callback)
  552. l.task_consumer = Mock()
  553. l.broadcast_consumer = Mock()
  554. l.qos = _QoS()
  555. l.connection = BrokerConnection()
  556. l.iterations = 0
  557. def raises_KeyError(limit=None):
  558. l.iterations += 1
  559. if l.qos.prev != l.qos.value:
  560. l.qos.update()
  561. if l.iterations >= 2:
  562. raise KeyError("foo")
  563. l.consume_messages = raises_KeyError
  564. with self.assertRaises(KeyError):
  565. l.start()
  566. self.assertTrue(init_callback.call_count)
  567. self.assertEqual(l.iterations, 1)
  568. self.assertEqual(l.qos.prev, l.qos.value)
  569. init_callback.reset_mock()
  570. l = _Consumer(self.ready_queue, timer=self.timer,
  571. send_events=False, init_callback=init_callback)
  572. l.qos = _QoS()
  573. l.task_consumer = Mock()
  574. l.broadcast_consumer = Mock()
  575. l.connection = BrokerConnection()
  576. l.consume_messages = Mock(side_effect=socket.error("foo"))
  577. with self.assertRaises(socket.error):
  578. l.start()
  579. self.assertTrue(init_callback.call_count)
  580. self.assertTrue(l.consume_messages.call_count)
  581. def test_reset_connection_with_no_node(self):
  582. l = BlockingConsumer(self.ready_queue, timer=self.timer)
  583. self.assertEqual(None, l.pool)
  584. l.reset_connection()
  585. def test_on_task_revoked(self):
  586. l = BlockingConsumer(self.ready_queue, timer=self.timer)
  587. task = Mock()
  588. task.revoked.return_value = True
  589. l.on_task(task)
  590. def test_on_task_no_events(self):
  591. l = BlockingConsumer(self.ready_queue, timer=self.timer)
  592. task = Mock()
  593. task.revoked.return_value = False
  594. l.event_dispatcher = Mock()
  595. l.event_dispatcher.enabled = False
  596. task.eta = None
  597. l._does_info = False
  598. l.on_task(task)
  599. class test_WorkController(AppCase):
  600. def setup(self):
  601. self.worker = self.create_worker()
  602. from celery import worker
  603. self._logger = worker.logger
  604. self.logger = worker.logger = Mock()
  605. def teardown(self):
  606. from celery import worker
  607. worker.logger = self._logger
  608. def create_worker(self, **kw):
  609. worker = self.app.WorkController(concurrency=1, loglevel=0, **kw)
  610. worker._shutdown_complete.set()
  611. return worker
  612. @patch("celery.platforms.create_pidlock")
  613. def test_use_pidfile(self, create_pidlock):
  614. create_pidlock.return_value = Mock()
  615. worker = self.create_worker(pidfile="pidfilelockfilepid")
  616. worker.components = []
  617. worker.start()
  618. self.assertTrue(create_pidlock.called)
  619. worker.stop()
  620. self.assertTrue(worker.pidlock.release.called)
  621. @patch("celery.platforms.signals")
  622. @patch("celery.platforms.set_mp_process_title")
  623. def test_process_initializer(self, set_mp_process_title, _signals):
  624. from celery import Celery
  625. from celery import signals
  626. from celery.state import _tls
  627. from celery.concurrency.processes import process_initializer
  628. from celery.concurrency.processes import (WORKER_SIGRESET,
  629. WORKER_SIGIGNORE)
  630. def on_worker_process_init(**kwargs):
  631. on_worker_process_init.called = True
  632. on_worker_process_init.called = False
  633. signals.worker_process_init.connect(on_worker_process_init)
  634. loader = Mock()
  635. loader.override_backends = {}
  636. app = Celery(loader=loader, set_as_current=False)
  637. app.loader = loader
  638. app.conf = AttributeDict(DEFAULTS)
  639. process_initializer(app, "awesome.worker.com")
  640. _signals.ignore.assert_any_call(*WORKER_SIGIGNORE)
  641. _signals.reset.assert_any_call(*WORKER_SIGRESET)
  642. self.assertTrue(app.loader.init_worker.call_count)
  643. self.assertTrue(on_worker_process_init.called)
  644. self.assertIs(_tls.current_app, app)
  645. set_mp_process_title.assert_called_with("celery",
  646. hostname="awesome.worker.com")
  647. def test_with_rate_limits_disabled(self):
  648. worker = WorkController(concurrency=1, loglevel=0,
  649. disable_rate_limits=True)
  650. self.assertTrue(hasattr(worker.ready_queue, "put"))
  651. def test_attrs(self):
  652. worker = self.worker
  653. self.assertIsInstance(worker.timer, Timer)
  654. self.assertTrue(worker.timer)
  655. self.assertTrue(worker.pool)
  656. self.assertTrue(worker.consumer)
  657. self.assertTrue(worker.mediator)
  658. self.assertTrue(worker.components)
  659. def test_with_embedded_celerybeat(self):
  660. worker = WorkController(concurrency=1, loglevel=0, beat=True)
  661. self.assertTrue(worker.beat)
  662. self.assertIn(worker.beat, worker.components)
  663. def test_with_autoscaler(self):
  664. worker = self.create_worker(autoscale=[10, 3], send_events=False,
  665. timer_cls="celery.utils.timer2.Timer")
  666. self.assertTrue(worker.autoscaler)
  667. def test_dont_stop_or_terminate(self):
  668. worker = WorkController(concurrency=1, loglevel=0)
  669. worker.stop()
  670. self.assertNotEqual(worker._state, worker.CLOSE)
  671. worker.terminate()
  672. self.assertNotEqual(worker._state, worker.CLOSE)
  673. sigsafe, worker.pool.signal_safe = worker.pool.signal_safe, False
  674. try:
  675. worker._state = worker.RUN
  676. worker.stop(in_sighandler=True)
  677. self.assertNotEqual(worker._state, worker.CLOSE)
  678. worker.terminate(in_sighandler=True)
  679. self.assertNotEqual(worker._state, worker.CLOSE)
  680. finally:
  681. worker.pool.signal_safe = sigsafe
  682. def test_on_timer_error(self):
  683. worker = WorkController(concurrency=1, loglevel=0)
  684. try:
  685. raise KeyError("foo")
  686. except KeyError, exc:
  687. Timers(worker).on_timer_error(exc)
  688. msg, args = self.logger.error.call_args[0]
  689. self.assertIn("KeyError", msg % args)
  690. def test_on_timer_tick(self):
  691. worker = WorkController(concurrency=1, loglevel=10)
  692. Timers(worker).on_timer_tick(30.0)
  693. xargs = self.logger.debug.call_args[0]
  694. fmt, arg = xargs[0], xargs[1]
  695. self.assertEqual(30.0, arg)
  696. self.assertIn("Next eta %s secs", fmt)
  697. def test_process_task(self):
  698. worker = self.worker
  699. worker.pool = Mock()
  700. backend = Mock()
  701. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  702. kwargs={})
  703. task = Request.from_message(m, m.decode())
  704. worker.process_task(task)
  705. self.assertEqual(worker.pool.apply_async.call_count, 1)
  706. worker.pool.stop()
  707. def test_process_task_raise_base(self):
  708. worker = self.worker
  709. worker.pool = Mock()
  710. worker.pool.apply_async.side_effect = KeyboardInterrupt("Ctrl+C")
  711. backend = Mock()
  712. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  713. kwargs={})
  714. task = Request.from_message(m, m.decode())
  715. worker.components = []
  716. worker._state = worker.RUN
  717. with self.assertRaises(KeyboardInterrupt):
  718. worker.process_task(task)
  719. self.assertEqual(worker._state, worker.TERMINATE)
  720. def test_process_task_raise_SystemTerminate(self):
  721. worker = self.worker
  722. worker.pool = Mock()
  723. worker.pool.apply_async.side_effect = SystemTerminate()
  724. backend = Mock()
  725. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  726. kwargs={})
  727. task = Request.from_message(m, m.decode())
  728. worker.components = []
  729. worker._state = worker.RUN
  730. with self.assertRaises(SystemExit):
  731. worker.process_task(task)
  732. self.assertEqual(worker._state, worker.TERMINATE)
  733. def test_process_task_raise_regular(self):
  734. worker = self.worker
  735. worker.pool = Mock()
  736. worker.pool.apply_async.side_effect = KeyError("some exception")
  737. backend = Mock()
  738. m = create_message(backend, task=foo_task.name, args=[4, 8, 10],
  739. kwargs={})
  740. task = Request.from_message(m, m.decode())
  741. worker.process_task(task)
  742. worker.pool.stop()
  743. def test_start_catches_base_exceptions(self):
  744. worker1 = self.create_worker()
  745. stc = Mock()
  746. stc.start.side_effect = SystemTerminate()
  747. worker1.components = [stc]
  748. worker1.start()
  749. self.assertTrue(stc.terminate.call_count)
  750. worker2 = self.create_worker()
  751. sec = Mock()
  752. sec.start.side_effect = SystemExit()
  753. sec.terminate = None
  754. worker2.components = [sec]
  755. worker2.start()
  756. self.assertTrue(sec.stop.call_count)
  757. def test_state_db(self):
  758. from celery.worker import state
  759. Persistent = state.Persistent
  760. state.Persistent = Mock()
  761. try:
  762. worker = self.create_worker(state_db="statefilename")
  763. self.assertTrue(worker._persistence)
  764. finally:
  765. state.Persistent = Persistent
  766. def test_disable_rate_limits_solo(self):
  767. worker = self.create_worker(disable_rate_limits=True,
  768. pool_cls="solo")
  769. self.assertIsInstance(worker.ready_queue, FastQueue)
  770. self.assertIsNone(worker.mediator)
  771. self.assertEqual(worker.ready_queue.put, worker.process_task)
  772. def test_disable_rate_limits_processes(self):
  773. try:
  774. worker = self.create_worker(disable_rate_limits=True,
  775. pool_cls="processes")
  776. except ImportError:
  777. raise SkipTest("multiprocessing not supported")
  778. self.assertIsInstance(worker.ready_queue, FastQueue)
  779. self.assertTrue(worker.mediator)
  780. self.assertNotEqual(worker.ready_queue.put, worker.process_task)
  781. def test_process_task_sem(self):
  782. worker = self.worker
  783. worker.semaphore = Mock()
  784. req = Mock()
  785. worker.process_task_sem(req)
  786. worker.semaphore.acquire.assert_called_with(worker.process_task, req)
  787. def test_signal_consumer_close(self):
  788. worker = self.worker
  789. worker.consumer = Mock()
  790. worker.signal_consumer_close()
  791. worker.consumer.close.assert_called_with()
  792. worker.consumer.close.side_effect = AttributeError()
  793. worker.signal_consumer_close()
  794. def test_start__stop(self):
  795. worker = self.worker
  796. worker._shutdown_complete.set()
  797. worker.components = [Mock(), Mock(), Mock(), Mock()]
  798. worker.start()
  799. for w in worker.components:
  800. self.assertTrue(w.start.call_count)
  801. worker.stop()
  802. for component in worker.components:
  803. self.assertTrue(w.stop.call_count)
  804. # Doesn't close pool if no pool.
  805. worker.start()
  806. worker.pool = None
  807. worker.stop()
  808. # test that stop of None is not attempted
  809. worker.components[-1] = None
  810. worker.start()
  811. worker.stop()
  812. def test_component_raises(self):
  813. worker = self.worker
  814. comp = Mock()
  815. worker.components = [comp]
  816. comp.start.side_effect = TypeError()
  817. worker.stop = Mock()
  818. worker.start()
  819. worker.stop.assert_called_with()
  820. def test_state(self):
  821. self.assertTrue(self.worker.state)
  822. def test_start__terminate(self):
  823. worker = self.worker
  824. worker._shutdown_complete.set()
  825. worker.components = [Mock(), Mock(), Mock(), Mock(), Mock()]
  826. for component in worker.components[:3]:
  827. component.terminate = None
  828. worker.start()
  829. for w in worker.components[:3]:
  830. self.assertTrue(w.start.call_count)
  831. self.assertTrue(worker._running, len(worker.components))
  832. self.assertEqual(worker._state, RUN)
  833. worker.terminate()
  834. for component in worker.components[:3]:
  835. self.assertTrue(component.stop.call_count)
  836. self.assertTrue(worker.components[4].terminate.call_count)
  837. def test_Queues_pool_not_rlimit_safe(self):
  838. w = Mock()
  839. w.pool_cls.rlimit_safe = False
  840. Queues(w).create(w)
  841. self.assertTrue(w.disable_rate_limits)
  842. def test_Queues_pool_no_sem(self):
  843. w = Mock()
  844. w.pool_cls.uses_semaphore = False
  845. Queues(w).create(w)
  846. self.assertIs(w.ready_queue.put, w.process_task)
  847. def test_EvLoop_crate(self):
  848. w = Mock()
  849. x = EvLoop(w)
  850. hub = x.create(w)
  851. self.assertTrue(w.timer.max_interval)
  852. self.assertIs(w.hub, hub)
  853. def test_Pool_crate_threaded(self):
  854. w = Mock()
  855. w.pool_cls = Mock()
  856. w.use_eventloop = False
  857. pool = Pool(w)
  858. pool.create(w)
  859. def test_Pool_create(self):
  860. from celery.worker.hub import BoundedSemaphore
  861. w = Mock()
  862. w.hub = Mock()
  863. w.hub.on_init = []
  864. w.pool_cls = Mock()
  865. P = w.pool_cls.return_value = Mock()
  866. P.timers = {Mock(): 30}
  867. w.use_eventloop = True
  868. pool = Pool(w)
  869. pool.create(w)
  870. self.assertIsInstance(w.semaphore, BoundedSemaphore)
  871. self.assertTrue(w.hub.on_init)
  872. hub = Mock()
  873. w.hub.on_init[0](hub)
  874. cbs = w.pool.init_callbacks.call_args[1]
  875. w = Mock()
  876. cbs["on_process_up"](w)
  877. hub.add_reader.assert_called_with(w.sentinel, P.maintain_pool)
  878. cbs["on_process_down"](w)
  879. hub.remove.assert_called_with(w.sentinel)
  880. result = Mock()
  881. tref = result._tref
  882. cbs["on_timeout_cancel"](result)
  883. tref.cancel.assert_called_with()
  884. cbs["on_timeout_cancel"](result) # no more tref
  885. cbs["on_timeout_set"](result, 10, 20)
  886. tsoft, callback = hub.timer.apply_after.call_args[0]
  887. callback()
  888. cbs["on_timeout_set"](result, 10, None)
  889. tsoft, callback = hub.timer.apply_after.call_args[0]
  890. callback()
  891. cbs["on_timeout_set"](result, None, 10)
  892. cbs["on_timeout_set"](result, None, None)
  893. P.did_start_ok.return_value = False
  894. with self.assertRaises(WorkerLostError):
  895. pool.on_poll_init(P, hub)