test_celeryd.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import logging
  4. import os
  5. import sys
  6. from functools import wraps
  7. from mock import Mock, patch
  8. from nose import SkipTest
  9. from billiard import current_process
  10. from kombu import Exchange, Queue
  11. from celery import Celery
  12. from celery import platforms
  13. from celery import signals
  14. from celery import current_app
  15. from celery.apps import worker as cd
  16. from celery.bin.celeryd import WorkerCommand, main as celeryd_main
  17. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  18. from celery.utils.log import ensure_process_aware_logger
  19. from celery.worker import state
  20. from celery.tests.utils import AppCase, WhateverIO
  21. ensure_process_aware_logger()
  22. def disable_stdouts(fun):
  23. @wraps(fun)
  24. def disable(*args, **kwargs):
  25. prev_out, prev_err = sys.stdout, sys.stderr
  26. prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
  27. sys.stdout = sys.__stdout__ = WhateverIO()
  28. sys.stderr = sys.__stderr__ = WhateverIO()
  29. try:
  30. return fun(*args, **kwargs)
  31. finally:
  32. sys.stdout = prev_out
  33. sys.stderr = prev_err
  34. sys.__stdout__ = prev_rout
  35. sys.__stderr__ = prev_rerr
  36. return disable
  37. class _WorkController(object):
  38. def __init__(self, *args, **kwargs):
  39. pass
  40. def start(self):
  41. pass
  42. class Worker(cd.Worker):
  43. WorkController = _WorkController
  44. class test_Worker(AppCase):
  45. Worker = Worker
  46. def teardown(self):
  47. self.app.conf.CELERY_INCLUDE = ()
  48. @disable_stdouts
  49. def test_queues_string(self):
  50. celery = Celery(set_as_current=False)
  51. worker = celery.Worker(queues="foo,bar,baz")
  52. worker.init_queues()
  53. self.assertEqual(worker.use_queues, ["foo", "bar", "baz"])
  54. self.assertTrue("foo" in celery.amqp.queues)
  55. @disable_stdouts
  56. def test_cpu_count(self):
  57. celery = Celery(set_as_current=False)
  58. with patch("celery.apps.worker.cpu_count") as cpu_count:
  59. cpu_count.side_effect = NotImplementedError()
  60. worker = celery.Worker(concurrency=None)
  61. self.assertEqual(worker.concurrency, 2)
  62. worker = celery.Worker(concurrency=5)
  63. self.assertEqual(worker.concurrency, 5)
  64. @disable_stdouts
  65. def test_windows_B_option(self):
  66. celery = Celery(set_as_current=False)
  67. celery.IS_WINDOWS = True
  68. with self.assertRaises(SystemExit):
  69. WorkerCommand(app=celery).run(beat=True)
  70. def test_setup_concurrency_very_early(self):
  71. x = WorkerCommand()
  72. x.run = Mock()
  73. with self.assertRaises(ImportError):
  74. x.execute_from_commandline(["celeryd", "-P", "xyzybox"])
  75. @disable_stdouts
  76. def test_invalid_loglevel_gives_error(self):
  77. x = WorkerCommand(app=Celery(set_as_current=False))
  78. with self.assertRaises(SystemExit):
  79. x.run(loglevel="GRIM_REAPER")
  80. def test_no_loglevel(self):
  81. app = Celery(set_as_current=False)
  82. app.Worker = Mock()
  83. WorkerCommand(app=app).run(loglevel=None)
  84. def test_tasklist(self):
  85. celery = Celery(set_as_current=False)
  86. worker = celery.Worker()
  87. self.assertTrue(worker.app.tasks)
  88. self.assertTrue(worker.app.finalized)
  89. self.assertTrue(worker.tasklist(include_builtins=True))
  90. worker.tasklist(include_builtins=False)
  91. def test_extra_info(self):
  92. celery = Celery(set_as_current=False)
  93. worker = celery.Worker()
  94. worker.loglevel = logging.WARNING
  95. self.assertFalse(worker.extra_info())
  96. worker.loglevel = logging.INFO
  97. self.assertTrue(worker.extra_info())
  98. @disable_stdouts
  99. def test_loglevel_string(self):
  100. worker = self.Worker(loglevel="INFO")
  101. self.assertEqual(worker.loglevel, logging.INFO)
  102. def test_run_worker(self):
  103. handlers = {}
  104. class Signals(platforms.Signals):
  105. def __setitem__(self, sig, handler):
  106. handlers[sig] = handler
  107. p = platforms.signals
  108. platforms.signals = Signals()
  109. try:
  110. w = self.Worker()
  111. w._isatty = False
  112. w.run_worker()
  113. for sig in "SIGINT", "SIGHUP", "SIGTERM":
  114. self.assertIn(sig, handlers)
  115. handlers.clear()
  116. w = self.Worker()
  117. w._isatty = True
  118. w.run_worker()
  119. for sig in "SIGINT", "SIGTERM":
  120. self.assertIn(sig, handlers)
  121. self.assertNotIn("SIGHUP", handlers)
  122. finally:
  123. platforms.signals = p
  124. @disable_stdouts
  125. def test_startup_info(self):
  126. worker = self.Worker()
  127. worker.run()
  128. self.assertTrue(worker.startup_info())
  129. worker.loglevel = logging.DEBUG
  130. self.assertTrue(worker.startup_info())
  131. worker.loglevel = logging.INFO
  132. self.assertTrue(worker.startup_info())
  133. worker.autoscale = 13, 10
  134. self.assertTrue(worker.startup_info())
  135. worker = self.Worker(queues="foo,bar,baz,xuzzy,do,re,mi")
  136. app = worker.app
  137. prev, app.loader = app.loader, Mock()
  138. try:
  139. app.loader.__module__ = "acme.baked_beans"
  140. self.assertTrue(worker.startup_info())
  141. finally:
  142. app.loader = prev
  143. prev, app.loader = app.loader, Mock()
  144. try:
  145. app.loader.__module__ = "celery.loaders.foo"
  146. self.assertTrue(worker.startup_info())
  147. finally:
  148. app.loader = prev
  149. from celery.loaders.app import AppLoader
  150. prev, app.loader = app.loader, AppLoader()
  151. try:
  152. self.assertTrue(worker.startup_info())
  153. finally:
  154. app.loader = prev
  155. worker.send_events = True
  156. self.assertTrue(worker.startup_info())
  157. # test when there are too few output lines
  158. # to draft the ascii art onto
  159. prev, cd.ARTLINES = (cd.ARTLINES,
  160. ["the quick brown fox"])
  161. self.assertTrue(worker.startup_info())
  162. @disable_stdouts
  163. def test_run(self):
  164. self.Worker().run()
  165. self.Worker(purge=True).run()
  166. worker = self.Worker()
  167. worker.run()
  168. prev, cd.IGNORE_ERRORS = cd.IGNORE_ERRORS, (KeyError, )
  169. try:
  170. worker.run_worker = Mock()
  171. worker.run_worker.side_effect = KeyError()
  172. worker.run()
  173. finally:
  174. cd.IGNORE_ERRORS = prev
  175. @disable_stdouts
  176. def test_purge_messages(self):
  177. self.Worker().purge_messages()
  178. @disable_stdouts
  179. def test_init_queues(self):
  180. app = current_app
  181. c = app.conf
  182. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  183. "celery": {"exchange": "celery",
  184. "routing_key": "celery"},
  185. "video": {"exchange": "video",
  186. "routing_key": "video"}})
  187. try:
  188. worker = self.Worker(queues=["video"])
  189. worker.init_queues()
  190. self.assertIn("video", app.amqp.queues)
  191. self.assertIn("video", app.amqp.queues.consume_from)
  192. self.assertIn("celery", app.amqp.queues)
  193. self.assertNotIn("celery", app.amqp.queues.consume_from)
  194. c.CELERY_CREATE_MISSING_QUEUES = False
  195. del(app.amqp.queues)
  196. with self.assertRaises(ImproperlyConfigured):
  197. self.Worker(queues=["image"]).init_queues()
  198. del(app.amqp.queues)
  199. c.CELERY_CREATE_MISSING_QUEUES = True
  200. worker = self.Worker(queues=["image"])
  201. worker.init_queues()
  202. self.assertIn("image", app.amqp.queues.consume_from)
  203. self.assertEqual(Queue("image", Exchange("image"),
  204. routing_key="image"), app.amqp.queues["image"])
  205. finally:
  206. app.amqp.queues = p
  207. @disable_stdouts
  208. def test_autoscale_argument(self):
  209. worker1 = self.Worker(autoscale="10,3")
  210. self.assertListEqual(worker1.autoscale, [10, 3])
  211. worker2 = self.Worker(autoscale="10")
  212. self.assertListEqual(worker2.autoscale, [10, 0])
  213. def test_include_argument(self):
  214. worker1 = self.Worker(include="some.module")
  215. self.assertListEqual(worker1.include, ["some.module"])
  216. worker2 = self.Worker(include="some.module,another.package")
  217. self.assertListEqual(worker2.include,
  218. ["some.module", "another.package"])
  219. self.Worker(include=["os", "sys"])
  220. @disable_stdouts
  221. def test_unknown_loglevel(self):
  222. with self.assertRaises(SystemExit):
  223. WorkerCommand(app=self.app).run(loglevel="ALIEN")
  224. worker1 = self.Worker(loglevel=0xFFFF)
  225. self.assertEqual(worker1.loglevel, 0xFFFF)
  226. def test_warns_if_running_as_privileged_user(self):
  227. app = current_app
  228. if app.IS_WINDOWS:
  229. raise SkipTest("Not applicable on Windows")
  230. def getuid():
  231. return 0
  232. prev, os.getuid = os.getuid, getuid
  233. try:
  234. with self.assertWarnsRegex(RuntimeWarning,
  235. r'superuser privileges is discouraged'):
  236. worker = self.Worker()
  237. worker.run()
  238. finally:
  239. os.getuid = prev
  240. @disable_stdouts
  241. def test_redirect_stdouts(self):
  242. worker = self.Worker()
  243. worker.redirect_stdouts = False
  244. worker.redirect_stdouts_to_logger()
  245. with self.assertRaises(AttributeError):
  246. sys.stdout.logger
  247. def test_redirect_stdouts_already_handled(self):
  248. logging_setup = [False]
  249. @signals.setup_logging.connect
  250. def on_logging_setup(**kwargs):
  251. logging_setup[0] = True
  252. try:
  253. worker = self.Worker()
  254. worker.app.log.__class__._setup = False
  255. worker.redirect_stdouts_to_logger()
  256. self.assertTrue(logging_setup[0])
  257. with self.assertRaises(AttributeError):
  258. sys.stdout.logger
  259. finally:
  260. signals.setup_logging.disconnect(on_logging_setup)
  261. @disable_stdouts
  262. def test_platform_tweaks_osx(self):
  263. class OSXWorker(self.Worker):
  264. proxy_workaround_installed = False
  265. def osx_proxy_detection_workaround(self):
  266. self.proxy_workaround_installed = True
  267. worker = OSXWorker()
  268. def install_HUP_nosupport(controller):
  269. controller.hup_not_supported_installed = True
  270. class Controller(object):
  271. pass
  272. prev = cd.install_HUP_not_supported_handler
  273. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  274. try:
  275. worker.app.IS_OSX = True
  276. controller = Controller()
  277. worker.install_platform_tweaks(controller)
  278. self.assertTrue(controller.hup_not_supported_installed)
  279. self.assertTrue(worker.proxy_workaround_installed)
  280. finally:
  281. cd.install_HUP_not_supported_handler = prev
  282. @disable_stdouts
  283. def test_general_platform_tweaks(self):
  284. restart_worker_handler_installed = [False]
  285. def install_worker_restart_handler(worker):
  286. restart_worker_handler_installed[0] = True
  287. class Controller(object):
  288. pass
  289. prev = cd.install_worker_restart_handler
  290. cd.install_worker_restart_handler = install_worker_restart_handler
  291. try:
  292. worker = self.Worker()
  293. worker.app.IS_OSX = False
  294. worker.install_platform_tweaks(Controller())
  295. self.assertTrue(restart_worker_handler_installed[0])
  296. finally:
  297. cd.install_worker_restart_handler = prev
  298. @disable_stdouts
  299. def test_on_consumer_ready(self):
  300. worker_ready_sent = [False]
  301. @signals.worker_ready.connect
  302. def on_worker_ready(**kwargs):
  303. worker_ready_sent[0] = True
  304. self.Worker().on_consumer_ready(object())
  305. self.assertTrue(worker_ready_sent[0])
  306. class test_funs(AppCase):
  307. def test_active_thread_count(self):
  308. self.assertTrue(cd.active_thread_count())
  309. @disable_stdouts
  310. def test_set_process_status(self):
  311. try:
  312. __import__("setproctitle")
  313. except ImportError:
  314. raise SkipTest("setproctitle not installed")
  315. worker = Worker(hostname="xyzza")
  316. prev1, sys.argv = sys.argv, ["Arg0"]
  317. try:
  318. st = worker.set_process_status("Running")
  319. self.assertIn("celeryd", st)
  320. self.assertIn("xyzza", st)
  321. self.assertIn("Running", st)
  322. prev2, sys.argv = sys.argv, ["Arg0", "Arg1"]
  323. try:
  324. st = worker.set_process_status("Running")
  325. self.assertIn("celeryd", st)
  326. self.assertIn("xyzza", st)
  327. self.assertIn("Running", st)
  328. self.assertIn("Arg1", st)
  329. finally:
  330. sys.argv = prev2
  331. finally:
  332. sys.argv = prev1
  333. @disable_stdouts
  334. def test_parse_options(self):
  335. cmd = WorkerCommand()
  336. cmd.app = current_app
  337. opts, args = cmd.parse_options("celeryd", ["--concurrency=512"])
  338. self.assertEqual(opts.concurrency, 512)
  339. @disable_stdouts
  340. def test_main(self):
  341. p, cd.Worker = cd.Worker, Worker
  342. s, sys.argv = sys.argv, ["celeryd", "--discard"]
  343. try:
  344. celeryd_main()
  345. finally:
  346. cd.Worker = p
  347. sys.argv = s
  348. class test_signal_handlers(AppCase):
  349. class _Worker(object):
  350. stopped = False
  351. terminated = False
  352. def stop(self, in_sighandler=False):
  353. self.stopped = True
  354. def terminate(self, in_sighandler=False):
  355. self.terminated = True
  356. def psig(self, fun, *args, **kwargs):
  357. handlers = {}
  358. class Signals(platforms.Signals):
  359. def __setitem__(self, sig, handler):
  360. handlers[sig] = handler
  361. p, platforms.signals = platforms.signals, Signals()
  362. try:
  363. fun(*args, **kwargs)
  364. return handlers
  365. finally:
  366. platforms.signals = p
  367. @disable_stdouts
  368. def test_worker_int_handler(self):
  369. worker = self._Worker()
  370. handlers = self.psig(cd.install_worker_int_handler, worker)
  371. next_handlers = {}
  372. state.should_stop = False
  373. state.should_terminate = False
  374. class Signals(platforms.Signals):
  375. def __setitem__(self, sig, handler):
  376. next_handlers[sig] = handler
  377. with patch("celery.apps.worker.active_thread_count") as c:
  378. c.return_value = 3
  379. p, platforms.signals = platforms.signals, Signals()
  380. try:
  381. handlers["SIGINT"]("SIGINT", object())
  382. self.assertTrue(state.should_stop)
  383. finally:
  384. platforms.signals = p
  385. state.should_stop = False
  386. try:
  387. next_handlers["SIGINT"]("SIGINT", object())
  388. self.assertTrue(state.should_terminate)
  389. finally:
  390. state.should_terminate = False
  391. with patch("celery.apps.worker.active_thread_count") as c:
  392. c.return_value = 1
  393. p, platforms.signals = platforms.signals, Signals()
  394. try:
  395. with self.assertRaises(SystemExit):
  396. handlers["SIGINT"]("SIGINT", object())
  397. finally:
  398. platforms.signals = p
  399. with self.assertRaises(SystemTerminate):
  400. next_handlers["SIGINT"]("SIGINT", object())
  401. @disable_stdouts
  402. def test_worker_int_handler_only_stop_MainProcess(self):
  403. try:
  404. import _multiprocessing # noqa
  405. except ImportError:
  406. raise SkipTest("only relevant for multiprocessing")
  407. process = current_process()
  408. name, process.name = process.name, "OtherProcess"
  409. with patch("celery.apps.worker.active_thread_count") as c:
  410. c.return_value = 3
  411. try:
  412. worker = self._Worker()
  413. handlers = self.psig(cd.install_worker_int_handler, worker)
  414. handlers["SIGINT"]("SIGINT", object())
  415. self.assertTrue(state.should_stop)
  416. finally:
  417. process.name = name
  418. state.should_stop = False
  419. with patch("celery.apps.worker.active_thread_count") as c:
  420. c.return_value = 1
  421. try:
  422. worker = self._Worker()
  423. handlers = self.psig(cd.install_worker_int_handler, worker)
  424. with self.assertRaises(SystemExit):
  425. handlers["SIGINT"]("SIGINT", object())
  426. finally:
  427. process.name = name
  428. state.should_stop = False
  429. @disable_stdouts
  430. def test_install_HUP_not_supported_handler(self):
  431. worker = self._Worker()
  432. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  433. handlers["SIGHUP"]("SIGHUP", object())
  434. @disable_stdouts
  435. def test_worker_term_hard_handler_only_stop_MainProcess(self):
  436. try:
  437. import _multiprocessing # noqa
  438. except ImportError:
  439. raise SkipTest("only relevant for multiprocessing")
  440. process = current_process()
  441. name, process.name = process.name, "OtherProcess"
  442. try:
  443. with patch("celery.apps.worker.active_thread_count") as c:
  444. c.return_value = 3
  445. worker = self._Worker()
  446. handlers = self.psig(
  447. cd.install_worker_term_hard_handler, worker)
  448. try:
  449. handlers["SIGQUIT"]("SIGQUIT", object())
  450. self.assertTrue(state.should_terminate)
  451. finally:
  452. state.should_terminate = False
  453. with patch("celery.apps.worker.active_thread_count") as c:
  454. c.return_value = 1
  455. worker = self._Worker()
  456. handlers = self.psig(
  457. cd.install_worker_term_hard_handler, worker)
  458. with self.assertRaises(SystemTerminate):
  459. handlers["SIGQUIT"]("SIGQUIT", object())
  460. finally:
  461. process.name = name
  462. @disable_stdouts
  463. def test_worker_term_handler_when_threads(self):
  464. with patch("celery.apps.worker.active_thread_count") as c:
  465. c.return_value = 3
  466. worker = self._Worker()
  467. handlers = self.psig(cd.install_worker_term_handler, worker)
  468. try:
  469. handlers["SIGTERM"]("SIGTERM", object())
  470. self.assertTrue(state.should_stop)
  471. finally:
  472. state.should_stop = False
  473. @disable_stdouts
  474. def test_worker_term_handler_when_single_thread(self):
  475. with patch("celery.apps.worker.active_thread_count") as c:
  476. c.return_value = 1
  477. worker = self._Worker()
  478. handlers = self.psig(cd.install_worker_term_handler, worker)
  479. try:
  480. with self.assertRaises(SystemExit):
  481. handlers["SIGTERM"]("SIGTERM", object())
  482. finally:
  483. state.should_stop = False
  484. @patch("sys.__stderr__")
  485. def test_worker_cry_handler(self, stderr):
  486. if sys.platform.startswith("java"):
  487. raise SkipTest("Cry handler does not work on Jython")
  488. if hasattr(sys, "pypy_version_info"):
  489. raise SkipTest("Cry handler does not work on PyPy")
  490. if sys.version_info > (2, 5):
  491. handlers = self.psig(cd.install_cry_handler)
  492. self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
  493. self.assertTrue(stderr.write.called)
  494. else:
  495. raise SkipTest("Needs Python 2.5 or later")
  496. @disable_stdouts
  497. def test_worker_term_handler_only_stop_MainProcess(self):
  498. try:
  499. import _multiprocessing # noqa
  500. except ImportError:
  501. raise SkipTest("only relevant for multiprocessing")
  502. process = current_process()
  503. name, process.name = process.name, "OtherProcess"
  504. try:
  505. with patch("celery.apps.worker.active_thread_count") as c:
  506. c.return_value = 3
  507. worker = self._Worker()
  508. handlers = self.psig(cd.install_worker_term_handler, worker)
  509. handlers["SIGTERM"]("SIGTERM", object())
  510. self.assertTrue(state.should_stop)
  511. with patch("celery.apps.worker.active_thread_count") as c:
  512. c.return_value = 1
  513. worker = self._Worker()
  514. handlers = self.psig(cd.install_worker_term_handler, worker)
  515. with self.assertRaises(SystemExit):
  516. handlers["SIGTERM"]("SIGTERM", object())
  517. finally:
  518. process.name = name
  519. state.should_stop = False
  520. @disable_stdouts
  521. @patch("os.fork")
  522. def test_worker_restart_handler(self, fork):
  523. fork.return_value = 0
  524. if getattr(os, "execv", None) is None:
  525. raise SkipTest("platform does not have excv")
  526. argv = []
  527. def _execv(*args):
  528. argv.extend(args)
  529. execv, os.execv = os.execv, _execv
  530. try:
  531. worker = self._Worker()
  532. handlers = self.psig(cd.install_worker_restart_handler, worker)
  533. handlers["SIGHUP"]("SIGHUP", object())
  534. self.assertTrue(state.should_stop)
  535. self.assertTrue(argv)
  536. argv[:] = []
  537. fork.return_value = 1
  538. handlers["SIGHUP"]("SIGHUP", object())
  539. self.assertFalse(argv)
  540. finally:
  541. os.execv = execv
  542. state.should_stop = False
  543. @disable_stdouts
  544. def test_worker_term_hard_handler_when_threaded(self):
  545. with patch("celery.apps.worker.active_thread_count") as c:
  546. c.return_value = 3
  547. worker = self._Worker()
  548. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  549. try:
  550. handlers["SIGQUIT"]("SIGQUIT", object())
  551. self.assertTrue(state.should_terminate)
  552. finally:
  553. state.should_terminate = False
  554. @disable_stdouts
  555. def test_worker_term_hard_handler_when_single_threaded(self):
  556. with patch("celery.apps.worker.active_thread_count") as c:
  557. c.return_value = 1
  558. worker = self._Worker()
  559. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  560. with self.assertRaises(SystemTerminate):
  561. handlers["SIGQUIT"]("SIGQUIT", object())