test_celeryd.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import logging
  4. import os
  5. import sys
  6. from functools import wraps
  7. from mock import Mock, patch
  8. from nose import SkipTest
  9. from billiard import current_process
  10. from kombu import Exchange, Queue
  11. from celery import Celery
  12. from celery import platforms
  13. from celery import signals
  14. from celery import current_app
  15. from celery.apps import worker as cd
  16. from celery.bin.celeryd import WorkerCommand, main as celeryd_main
  17. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  18. from celery.utils.log import ensure_process_aware_logger
  19. from celery.worker import state
  20. from celery.tests.utils import (
  21. AppCase,
  22. WhateverIO,
  23. skip_if_pypy,
  24. skip_if_jython,
  25. )
  26. ensure_process_aware_logger()
  27. def disable_stdouts(fun):
  28. @wraps(fun)
  29. def disable(*args, **kwargs):
  30. prev_out, prev_err = sys.stdout, sys.stderr
  31. prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
  32. sys.stdout = sys.__stdout__ = WhateverIO()
  33. sys.stderr = sys.__stderr__ = WhateverIO()
  34. try:
  35. return fun(*args, **kwargs)
  36. finally:
  37. sys.stdout = prev_out
  38. sys.stderr = prev_err
  39. sys.__stdout__ = prev_rout
  40. sys.__stderr__ = prev_rerr
  41. return disable
  42. class _WorkController(object):
  43. def __init__(self, *args, **kwargs):
  44. pass
  45. def start(self):
  46. pass
  47. class Worker(cd.Worker):
  48. WorkController = _WorkController
  49. def __init__(self, *args, **kwargs):
  50. super(Worker, self).__init__(*args, **kwargs)
  51. self.redirect_stdouts = False
  52. class test_Worker(AppCase):
  53. Worker = Worker
  54. def teardown(self):
  55. self.app.conf.CELERY_INCLUDE = ()
  56. @disable_stdouts
  57. def test_queues_string(self):
  58. celery = Celery(set_as_current=False)
  59. worker = celery.Worker(queues='foo,bar,baz')
  60. worker.init_queues()
  61. self.assertEqual(worker.use_queues, ['foo', 'bar', 'baz'])
  62. self.assertTrue('foo' in celery.amqp.queues)
  63. @disable_stdouts
  64. def test_cpu_count(self):
  65. celery = Celery(set_as_current=False)
  66. with patch('celery.apps.worker.cpu_count') as cpu_count:
  67. cpu_count.side_effect = NotImplementedError()
  68. worker = celery.Worker(concurrency=None)
  69. self.assertEqual(worker.concurrency, 2)
  70. worker = celery.Worker(concurrency=5)
  71. self.assertEqual(worker.concurrency, 5)
  72. @disable_stdouts
  73. def test_windows_B_option(self):
  74. celery = Celery(set_as_current=False)
  75. celery.IS_WINDOWS = True
  76. with self.assertRaises(SystemExit):
  77. WorkerCommand(app=celery).run(beat=True)
  78. def test_setup_concurrency_very_early(self):
  79. x = WorkerCommand()
  80. x.run = Mock()
  81. with self.assertRaises(ImportError):
  82. x.execute_from_commandline(['celeryd', '-P', 'xyzybox'])
  83. @disable_stdouts
  84. def test_invalid_loglevel_gives_error(self):
  85. x = WorkerCommand(app=Celery(set_as_current=False))
  86. with self.assertRaises(SystemExit):
  87. x.run(loglevel='GRIM_REAPER')
  88. def test_no_loglevel(self):
  89. app = Celery(set_as_current=False)
  90. app.Worker = Mock()
  91. WorkerCommand(app=app).run(loglevel=None)
  92. def test_tasklist(self):
  93. celery = Celery(set_as_current=False)
  94. worker = celery.Worker()
  95. self.assertTrue(worker.app.tasks)
  96. self.assertTrue(worker.app.finalized)
  97. self.assertTrue(worker.tasklist(include_builtins=True))
  98. worker.tasklist(include_builtins=False)
  99. def test_extra_info(self):
  100. celery = Celery(set_as_current=False)
  101. worker = celery.Worker()
  102. worker.loglevel = logging.WARNING
  103. self.assertFalse(worker.extra_info())
  104. worker.loglevel = logging.INFO
  105. self.assertTrue(worker.extra_info())
  106. @disable_stdouts
  107. def test_loglevel_string(self):
  108. worker = self.Worker(loglevel='INFO')
  109. self.assertEqual(worker.loglevel, logging.INFO)
  110. def test_run_worker(self):
  111. handlers = {}
  112. class Signals(platforms.Signals):
  113. def __setitem__(self, sig, handler):
  114. handlers[sig] = handler
  115. p = platforms.signals
  116. platforms.signals = Signals()
  117. try:
  118. w = self.Worker()
  119. w._isatty = False
  120. w.run_worker()
  121. for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
  122. self.assertIn(sig, handlers)
  123. handlers.clear()
  124. w = self.Worker()
  125. w._isatty = True
  126. w.run_worker()
  127. for sig in 'SIGINT', 'SIGTERM':
  128. self.assertIn(sig, handlers)
  129. self.assertNotIn('SIGHUP', handlers)
  130. finally:
  131. platforms.signals = p
  132. @disable_stdouts
  133. def test_startup_info(self):
  134. worker = self.Worker()
  135. worker.run()
  136. self.assertTrue(worker.startup_info())
  137. worker.loglevel = logging.DEBUG
  138. self.assertTrue(worker.startup_info())
  139. worker.loglevel = logging.INFO
  140. self.assertTrue(worker.startup_info())
  141. worker.autoscale = 13, 10
  142. self.assertTrue(worker.startup_info())
  143. worker = self.Worker(queues='foo,bar,baz,xuzzy,do,re,mi')
  144. app = worker.app
  145. prev, app.loader = app.loader, Mock()
  146. try:
  147. app.loader.__module__ = 'acme.baked_beans'
  148. self.assertTrue(worker.startup_info())
  149. finally:
  150. app.loader = prev
  151. prev, app.loader = app.loader, Mock()
  152. try:
  153. app.loader.__module__ = 'celery.loaders.foo'
  154. self.assertTrue(worker.startup_info())
  155. finally:
  156. app.loader = prev
  157. from celery.loaders.app import AppLoader
  158. prev, app.loader = app.loader, AppLoader()
  159. try:
  160. self.assertTrue(worker.startup_info())
  161. finally:
  162. app.loader = prev
  163. worker.send_events = True
  164. self.assertTrue(worker.startup_info())
  165. # test when there are too few output lines
  166. # to draft the ascii art onto
  167. prev, cd.ARTLINES = (cd.ARTLINES,
  168. ['the quick brown fox'])
  169. self.assertTrue(worker.startup_info())
  170. @disable_stdouts
  171. def test_run(self):
  172. self.Worker().run()
  173. self.Worker(purge=True).run()
  174. worker = self.Worker()
  175. worker.run()
  176. prev, cd.IGNORE_ERRORS = cd.IGNORE_ERRORS, (KeyError, )
  177. try:
  178. worker.run_worker = Mock()
  179. worker.run_worker.side_effect = KeyError()
  180. worker.run()
  181. finally:
  182. cd.IGNORE_ERRORS = prev
  183. @disable_stdouts
  184. def test_purge_messages(self):
  185. self.Worker().purge_messages()
  186. @disable_stdouts
  187. def test_init_queues(self):
  188. app = current_app
  189. c = app.conf
  190. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  191. 'celery': {'exchange': 'celery',
  192. 'routing_key': 'celery'},
  193. 'video': {'exchange': 'video',
  194. 'routing_key': 'video'}})
  195. try:
  196. worker = self.Worker(queues=['video'])
  197. worker.init_queues()
  198. self.assertIn('video', app.amqp.queues)
  199. self.assertIn('video', app.amqp.queues.consume_from)
  200. self.assertIn('celery', app.amqp.queues)
  201. self.assertNotIn('celery', app.amqp.queues.consume_from)
  202. c.CELERY_CREATE_MISSING_QUEUES = False
  203. del(app.amqp.queues)
  204. with self.assertRaises(ImproperlyConfigured):
  205. self.Worker(queues=['image']).init_queues()
  206. del(app.amqp.queues)
  207. c.CELERY_CREATE_MISSING_QUEUES = True
  208. worker = self.Worker(queues=['image'])
  209. worker.init_queues()
  210. self.assertIn('image', app.amqp.queues.consume_from)
  211. self.assertEqual(Queue('image', Exchange('image'),
  212. routing_key='image'), app.amqp.queues['image'])
  213. finally:
  214. app.amqp.queues = p
  215. @disable_stdouts
  216. def test_autoscale_argument(self):
  217. worker1 = self.Worker(autoscale='10,3')
  218. self.assertListEqual(worker1.autoscale, [10, 3])
  219. worker2 = self.Worker(autoscale='10')
  220. self.assertListEqual(worker2.autoscale, [10, 0])
  221. def test_include_argument(self):
  222. worker1 = self.Worker(include='some.module')
  223. self.assertListEqual(worker1.include, ['some.module'])
  224. worker2 = self.Worker(include='some.module,another.package')
  225. self.assertListEqual(worker2.include,
  226. ['some.module', 'another.package'])
  227. self.Worker(include=['os', 'sys'])
  228. @disable_stdouts
  229. def test_unknown_loglevel(self):
  230. with self.assertRaises(SystemExit):
  231. WorkerCommand(app=self.app).run(loglevel='ALIEN')
  232. worker1 = self.Worker(loglevel=0xFFFF)
  233. self.assertEqual(worker1.loglevel, 0xFFFF)
  234. def test_warns_if_running_as_privileged_user(self):
  235. app = current_app
  236. if app.IS_WINDOWS:
  237. raise SkipTest('Not applicable on Windows')
  238. def getuid():
  239. return 0
  240. prev, os.getuid = os.getuid, getuid
  241. try:
  242. with self.assertWarnsRegex(RuntimeWarning,
  243. r'superuser privileges is discouraged'):
  244. worker = self.Worker()
  245. worker.run()
  246. finally:
  247. os.getuid = prev
  248. @disable_stdouts
  249. def test_redirect_stdouts(self):
  250. worker = self.Worker()
  251. worker.redirect_stdouts = False
  252. worker.redirect_stdouts_to_logger()
  253. with self.assertRaises(AttributeError):
  254. sys.stdout.logger
  255. def test_redirect_stdouts_already_handled(self):
  256. logging_setup = [False]
  257. @signals.setup_logging.connect
  258. def on_logging_setup(**kwargs):
  259. logging_setup[0] = True
  260. try:
  261. worker = self.Worker()
  262. worker.app.log.__class__._setup = False
  263. worker.redirect_stdouts_to_logger()
  264. self.assertTrue(logging_setup[0])
  265. with self.assertRaises(AttributeError):
  266. sys.stdout.logger
  267. finally:
  268. signals.setup_logging.disconnect(on_logging_setup)
  269. @disable_stdouts
  270. def test_platform_tweaks_osx(self):
  271. class OSXWorker(Worker):
  272. proxy_workaround_installed = False
  273. def osx_proxy_detection_workaround(self):
  274. self.proxy_workaround_installed = True
  275. worker = OSXWorker(redirect_stdouts=False)
  276. def install_HUP_nosupport(controller):
  277. controller.hup_not_supported_installed = True
  278. class Controller(object):
  279. pass
  280. prev = cd.install_HUP_not_supported_handler
  281. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  282. try:
  283. worker.app.IS_OSX = True
  284. controller = Controller()
  285. worker.install_platform_tweaks(controller)
  286. self.assertTrue(controller.hup_not_supported_installed)
  287. self.assertTrue(worker.proxy_workaround_installed)
  288. finally:
  289. cd.install_HUP_not_supported_handler = prev
  290. @disable_stdouts
  291. def test_general_platform_tweaks(self):
  292. restart_worker_handler_installed = [False]
  293. def install_worker_restart_handler(worker):
  294. restart_worker_handler_installed[0] = True
  295. class Controller(object):
  296. pass
  297. prev = cd.install_worker_restart_handler
  298. cd.install_worker_restart_handler = install_worker_restart_handler
  299. try:
  300. worker = self.Worker()
  301. worker.app.IS_OSX = False
  302. worker.install_platform_tweaks(Controller())
  303. self.assertTrue(restart_worker_handler_installed[0])
  304. finally:
  305. cd.install_worker_restart_handler = prev
  306. @disable_stdouts
  307. def test_on_consumer_ready(self):
  308. worker_ready_sent = [False]
  309. @signals.worker_ready.connect
  310. def on_worker_ready(**kwargs):
  311. worker_ready_sent[0] = True
  312. self.Worker().on_consumer_ready(object())
  313. self.assertTrue(worker_ready_sent[0])
  314. class test_funs(AppCase):
  315. def test_active_thread_count(self):
  316. self.assertTrue(cd.active_thread_count())
  317. @disable_stdouts
  318. def test_set_process_status(self):
  319. try:
  320. __import__('setproctitle')
  321. except ImportError:
  322. raise SkipTest('setproctitle not installed')
  323. worker = Worker(hostname='xyzza')
  324. prev1, sys.argv = sys.argv, ['Arg0']
  325. try:
  326. st = worker.set_process_status('Running')
  327. self.assertIn('celeryd', st)
  328. self.assertIn('xyzza', st)
  329. self.assertIn('Running', st)
  330. prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
  331. try:
  332. st = worker.set_process_status('Running')
  333. self.assertIn('celeryd', st)
  334. self.assertIn('xyzza', st)
  335. self.assertIn('Running', st)
  336. self.assertIn('Arg1', st)
  337. finally:
  338. sys.argv = prev2
  339. finally:
  340. sys.argv = prev1
  341. @disable_stdouts
  342. def test_parse_options(self):
  343. cmd = WorkerCommand()
  344. cmd.app = current_app
  345. opts, args = cmd.parse_options('celeryd', ['--concurrency=512'])
  346. self.assertEqual(opts.concurrency, 512)
  347. @disable_stdouts
  348. def test_main(self):
  349. p, cd.Worker = cd.Worker, Worker
  350. s, sys.argv = sys.argv, ['celeryd', '--discard']
  351. try:
  352. celeryd_main()
  353. finally:
  354. cd.Worker = p
  355. sys.argv = s
  356. class test_signal_handlers(AppCase):
  357. class _Worker(object):
  358. stopped = False
  359. terminated = False
  360. def stop(self, in_sighandler=False):
  361. self.stopped = True
  362. def terminate(self, in_sighandler=False):
  363. self.terminated = True
  364. def psig(self, fun, *args, **kwargs):
  365. handlers = {}
  366. class Signals(platforms.Signals):
  367. def __setitem__(self, sig, handler):
  368. handlers[sig] = handler
  369. p, platforms.signals = platforms.signals, Signals()
  370. try:
  371. fun(*args, **kwargs)
  372. return handlers
  373. finally:
  374. platforms.signals = p
  375. @disable_stdouts
  376. def test_worker_int_handler(self):
  377. worker = self._Worker()
  378. handlers = self.psig(cd.install_worker_int_handler, worker)
  379. next_handlers = {}
  380. state.should_stop = False
  381. state.should_terminate = False
  382. class Signals(platforms.Signals):
  383. def __setitem__(self, sig, handler):
  384. next_handlers[sig] = handler
  385. with patch('celery.apps.worker.active_thread_count') as c:
  386. c.return_value = 3
  387. p, platforms.signals = platforms.signals, Signals()
  388. try:
  389. handlers['SIGINT']('SIGINT', object())
  390. self.assertTrue(state.should_stop)
  391. finally:
  392. platforms.signals = p
  393. state.should_stop = False
  394. try:
  395. next_handlers['SIGINT']('SIGINT', object())
  396. self.assertTrue(state.should_terminate)
  397. finally:
  398. state.should_terminate = False
  399. with patch('celery.apps.worker.active_thread_count') as c:
  400. c.return_value = 1
  401. p, platforms.signals = platforms.signals, Signals()
  402. try:
  403. with self.assertRaises(SystemExit):
  404. handlers['SIGINT']('SIGINT', object())
  405. finally:
  406. platforms.signals = p
  407. with self.assertRaises(SystemTerminate):
  408. next_handlers['SIGINT']('SIGINT', object())
  409. @disable_stdouts
  410. def test_worker_int_handler_only_stop_MainProcess(self):
  411. try:
  412. import _multiprocessing # noqa
  413. except ImportError:
  414. raise SkipTest('only relevant for multiprocessing')
  415. process = current_process()
  416. name, process.name = process.name, 'OtherProcess'
  417. with patch('celery.apps.worker.active_thread_count') as c:
  418. c.return_value = 3
  419. try:
  420. worker = self._Worker()
  421. handlers = self.psig(cd.install_worker_int_handler, worker)
  422. handlers['SIGINT']('SIGINT', object())
  423. self.assertTrue(state.should_stop)
  424. finally:
  425. process.name = name
  426. state.should_stop = False
  427. with patch('celery.apps.worker.active_thread_count') as c:
  428. c.return_value = 1
  429. try:
  430. worker = self._Worker()
  431. handlers = self.psig(cd.install_worker_int_handler, worker)
  432. with self.assertRaises(SystemExit):
  433. handlers['SIGINT']('SIGINT', object())
  434. finally:
  435. process.name = name
  436. state.should_stop = False
  437. @disable_stdouts
  438. def test_install_HUP_not_supported_handler(self):
  439. worker = self._Worker()
  440. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  441. handlers['SIGHUP']('SIGHUP', object())
  442. @disable_stdouts
  443. def test_worker_term_hard_handler_only_stop_MainProcess(self):
  444. try:
  445. import _multiprocessing # noqa
  446. except ImportError:
  447. raise SkipTest('only relevant for multiprocessing')
  448. process = current_process()
  449. name, process.name = process.name, 'OtherProcess'
  450. try:
  451. with patch('celery.apps.worker.active_thread_count') as c:
  452. c.return_value = 3
  453. worker = self._Worker()
  454. handlers = self.psig(
  455. cd.install_worker_term_hard_handler, worker)
  456. try:
  457. handlers['SIGQUIT']('SIGQUIT', object())
  458. self.assertTrue(state.should_terminate)
  459. finally:
  460. state.should_terminate = False
  461. with patch('celery.apps.worker.active_thread_count') as c:
  462. c.return_value = 1
  463. worker = self._Worker()
  464. handlers = self.psig(
  465. cd.install_worker_term_hard_handler, worker)
  466. with self.assertRaises(SystemTerminate):
  467. handlers['SIGQUIT']('SIGQUIT', object())
  468. finally:
  469. process.name = name
  470. @disable_stdouts
  471. def test_worker_term_handler_when_threads(self):
  472. with patch('celery.apps.worker.active_thread_count') as c:
  473. c.return_value = 3
  474. worker = self._Worker()
  475. handlers = self.psig(cd.install_worker_term_handler, worker)
  476. try:
  477. handlers['SIGTERM']('SIGTERM', object())
  478. self.assertTrue(state.should_stop)
  479. finally:
  480. state.should_stop = False
  481. @disable_stdouts
  482. def test_worker_term_handler_when_single_thread(self):
  483. with patch('celery.apps.worker.active_thread_count') as c:
  484. c.return_value = 1
  485. worker = self._Worker()
  486. handlers = self.psig(cd.install_worker_term_handler, worker)
  487. try:
  488. with self.assertRaises(SystemExit):
  489. handlers['SIGTERM']('SIGTERM', object())
  490. finally:
  491. state.should_stop = False
  492. @patch('sys.__stderr__')
  493. @skip_if_pypy
  494. @skip_if_jython
  495. def test_worker_cry_handler(self, stderr):
  496. if sys.version_info > (2, 5):
  497. handlers = self.psig(cd.install_cry_handler)
  498. self.assertIsNone(handlers['SIGUSR1']('SIGUSR1', object()))
  499. self.assertTrue(stderr.write.called)
  500. else:
  501. raise SkipTest('Needs Python 2.5 or later')
  502. @disable_stdouts
  503. def test_worker_term_handler_only_stop_MainProcess(self):
  504. try:
  505. import _multiprocessing # noqa
  506. except ImportError:
  507. raise SkipTest('only relevant for multiprocessing')
  508. process = current_process()
  509. name, process.name = process.name, 'OtherProcess'
  510. try:
  511. with patch('celery.apps.worker.active_thread_count') as c:
  512. c.return_value = 3
  513. worker = self._Worker()
  514. handlers = self.psig(cd.install_worker_term_handler, worker)
  515. handlers['SIGTERM']('SIGTERM', object())
  516. self.assertTrue(state.should_stop)
  517. with patch('celery.apps.worker.active_thread_count') as c:
  518. c.return_value = 1
  519. worker = self._Worker()
  520. handlers = self.psig(cd.install_worker_term_handler, worker)
  521. with self.assertRaises(SystemExit):
  522. handlers['SIGTERM']('SIGTERM', object())
  523. finally:
  524. process.name = name
  525. state.should_stop = False
  526. @disable_stdouts
  527. @patch('atexit.register')
  528. @patch('os.fork')
  529. def test_worker_restart_handler(self, fork, register):
  530. fork.return_value = 0
  531. if getattr(os, 'execv', None) is None:
  532. raise SkipTest('platform does not have excv')
  533. argv = []
  534. def _execv(*args):
  535. argv.extend(args)
  536. execv, os.execv = os.execv, _execv
  537. try:
  538. worker = self._Worker()
  539. handlers = self.psig(cd.install_worker_restart_handler, worker)
  540. handlers['SIGHUP']('SIGHUP', object())
  541. self.assertTrue(state.should_stop)
  542. self.assertTrue(register.called)
  543. callback = register.call_args[0][0]
  544. callback()
  545. self.assertTrue(argv)
  546. argv[:] = []
  547. fork.return_value = 1
  548. callback()
  549. self.assertFalse(argv)
  550. finally:
  551. os.execv = execv
  552. state.should_stop = False
  553. @disable_stdouts
  554. def test_worker_term_hard_handler_when_threaded(self):
  555. with patch('celery.apps.worker.active_thread_count') as c:
  556. c.return_value = 3
  557. worker = self._Worker()
  558. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  559. try:
  560. handlers['SIGQUIT']('SIGQUIT', object())
  561. self.assertTrue(state.should_terminate)
  562. finally:
  563. state.should_terminate = False
  564. @disable_stdouts
  565. def test_worker_term_hard_handler_when_single_threaded(self):
  566. with patch('celery.apps.worker.active_thread_count') as c:
  567. c.return_value = 1
  568. worker = self._Worker()
  569. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  570. with self.assertRaises(SystemTerminate):
  571. handlers['SIGQUIT']('SIGQUIT', object())