test_worker.py 24 KB


  1. from __future__ import absolute_import
  2. import logging
  3. import os
  4. import sys
  5. from functools import wraps
  6. from mock import Mock, patch
  7. from nose import SkipTest
  8. from billiard import current_process
  9. from kombu import Exchange, Queue
  10. from celery import Celery
  11. from celery import platforms
  12. from celery import signals
  13. from celery.app import trace
  14. from celery.apps import worker as cd
  15. from celery.bin.worker import worker, main as worker_main
  16. from celery.exceptions import ImproperlyConfigured, SystemTerminate
  17. from celery.utils.log import ensure_process_aware_logger
  18. from celery.worker import state
  19. from celery.tests.case import (
  20. AppCase,
  21. WhateverIO,
  22. skip_if_pypy,
  23. skip_if_jython,
  24. )
  25. ensure_process_aware_logger()
  26. class WorkerAppCase(AppCase):
  27. def tearDown(self):
  28. super(WorkerAppCase, self).tearDown()
  29. trace.reset_worker_optimizations()
  30. def disable_stdouts(fun):
  31. @wraps(fun)
  32. def disable(*args, **kwargs):
  33. prev_out, prev_err = sys.stdout, sys.stderr
  34. prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
  35. sys.stdout = sys.__stdout__ = WhateverIO()
  36. sys.stderr = sys.__stderr__ = WhateverIO()
  37. try:
  38. return fun(*args, **kwargs)
  39. finally:
  40. sys.stdout = prev_out
  41. sys.stderr = prev_err
  42. sys.__stdout__ = prev_rout
  43. sys.__stderr__ = prev_rerr
  44. return disable
  45. class Worker(cd.Worker):
  46. redirect_stdouts = False
  47. def start(self, *args, **kwargs):
  48. self.on_start()
  49. class test_Worker(WorkerAppCase):
  50. Worker = Worker
  51. def teardown(self):
  52. self.app.conf.CELERY_INCLUDE = ()
  53. @disable_stdouts
  54. def test_queues_string(self):
  55. celery = Celery(set_as_current=False)
  56. w = celery.Worker()
  57. w.setup_queues('foo,bar,baz')
  58. self.assertEqual(w.queues, ['foo', 'bar', 'baz'])
  59. self.assertTrue('foo' in celery.amqp.queues)
  60. @disable_stdouts
  61. def test_cpu_count(self):
  62. celery = Celery(set_as_current=False)
  63. with patch('celery.worker.cpu_count') as cpu_count:
  64. cpu_count.side_effect = NotImplementedError()
  65. w = celery.Worker(concurrency=None)
  66. self.assertEqual(w.concurrency, 2)
  67. w = celery.Worker(concurrency=5)
  68. self.assertEqual(w.concurrency, 5)
  69. @disable_stdouts
  70. def test_windows_B_option(self):
  71. celery = Celery(set_as_current=False)
  72. celery.IS_WINDOWS = True
  73. with self.assertRaises(SystemExit):
  74. worker(app=celery).run(beat=True)
  75. def test_setup_concurrency_very_early(self):
  76. x = worker()
  77. x.run = Mock()
  78. with self.assertRaises(ImportError):
  79. x.execute_from_commandline(['worker', '-P', 'xyzybox'])
  80. def test_run_from_argv_basic(self):
  81. x = worker(app=self.app)
  82. x.run = Mock()
  83. x.maybe_detach = Mock()
  84. def run(*args, **kwargs):
  85. pass
  86. x.run = run
  87. x.run_from_argv('celery', [])
  88. self.assertTrue(x.maybe_detach.called)
  89. def test_maybe_detach(self):
  90. x = worker(app=self.app)
  91. with patch('celery.bin.worker.detached_celeryd') as detached:
  92. x.maybe_detach([])
  93. self.assertFalse(detached.called)
  94. with self.assertRaises(SystemExit):
  95. x.maybe_detach(['--detach'])
  96. self.assertTrue(detached.called)
  97. @disable_stdouts
  98. def test_invalid_loglevel_gives_error(self):
  99. x = worker(app=Celery(set_as_current=False))
  100. with self.assertRaises(SystemExit):
  101. x.run(loglevel='GRIM_REAPER')
  102. def test_no_loglevel(self):
  103. app = Celery(set_as_current=False)
  104. app.Worker = Mock()
  105. worker(app=app).run(loglevel=None)
  106. def test_tasklist(self):
  107. celery = Celery(set_as_current=False)
  108. worker = celery.Worker()
  109. self.assertTrue(worker.app.tasks)
  110. self.assertTrue(worker.app.finalized)
  111. self.assertTrue(worker.tasklist(include_builtins=True))
  112. worker.tasklist(include_builtins=False)
  113. def test_extra_info(self):
  114. celery = Celery(set_as_current=False)
  115. worker = celery.Worker()
  116. worker.loglevel = logging.WARNING
  117. self.assertFalse(worker.extra_info())
  118. worker.loglevel = logging.INFO
  119. self.assertTrue(worker.extra_info())
  120. @disable_stdouts
  121. def test_loglevel_string(self):
  122. worker = self.Worker(app=self.app, loglevel='INFO')
  123. self.assertEqual(worker.loglevel, logging.INFO)
  124. def test_run_worker(self):
  125. handlers = {}
  126. class Signals(platforms.Signals):
  127. def __setitem__(self, sig, handler):
  128. handlers[sig] = handler
  129. p = platforms.signals
  130. platforms.signals = Signals()
  131. try:
  132. w = self.Worker(app=self.app)
  133. w._isatty = False
  134. w.on_start()
  135. for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
  136. self.assertIn(sig, handlers)
  137. handlers.clear()
  138. w = self.Worker(app=self.app)
  139. w._isatty = True
  140. w.on_start()
  141. for sig in 'SIGINT', 'SIGTERM':
  142. self.assertIn(sig, handlers)
  143. self.assertNotIn('SIGHUP', handlers)
  144. finally:
  145. platforms.signals = p
  146. @disable_stdouts
  147. def test_startup_info(self):
  148. worker = self.Worker(app=self.app)
  149. worker.on_start()
  150. self.assertTrue(worker.startup_info())
  151. worker.loglevel = logging.DEBUG
  152. self.assertTrue(worker.startup_info())
  153. worker.loglevel = logging.INFO
  154. self.assertTrue(worker.startup_info())
  155. worker.autoscale = 13, 10
  156. self.assertTrue(worker.startup_info())
  157. app = Celery(set_as_current=False)
  158. worker = self.Worker(app=app, queues='foo,bar,baz,xuzzy,do,re,mi')
  159. prev, app.loader = app.loader, Mock()
  160. try:
  161. app.loader.__module__ = 'acme.baked_beans'
  162. self.assertTrue(worker.startup_info())
  163. finally:
  164. app.loader = prev
  165. prev, app.loader = app.loader, Mock()
  166. try:
  167. app.loader.__module__ = 'celery.loaders.foo'
  168. self.assertTrue(worker.startup_info())
  169. finally:
  170. app.loader = prev
  171. from celery.loaders.app import AppLoader
  172. prev, app.loader = app.loader, AppLoader(app=self.app)
  173. try:
  174. self.assertTrue(worker.startup_info())
  175. finally:
  176. app.loader = prev
  177. worker.send_events = True
  178. self.assertTrue(worker.startup_info())
  179. # test when there are too few output lines
  180. # to draft the ascii art onto
  181. prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
  182. self.assertTrue(worker.startup_info())
  183. @disable_stdouts
  184. def test_run(self):
  185. self.Worker(app=self.app).on_start()
  186. self.Worker(app=self.app, purge=True).on_start()
  187. worker = self.Worker(app=self.app)
  188. worker.on_start()
  189. @disable_stdouts
  190. def test_purge_messages(self):
  191. self.Worker(app=self.app).purge_messages()
  192. @disable_stdouts
  193. def test_init_queues(self):
  194. app = self.app
  195. c = app.conf
  196. p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
  197. 'celery': {'exchange': 'celery',
  198. 'routing_key': 'celery'},
  199. 'video': {'exchange': 'video',
  200. 'routing_key': 'video'}})
  201. try:
  202. worker = self.Worker(app=self.app)
  203. worker.setup_queues(['video'])
  204. self.assertIn('video', app.amqp.queues)
  205. self.assertIn('video', app.amqp.queues.consume_from)
  206. self.assertIn('celery', app.amqp.queues)
  207. self.assertNotIn('celery', app.amqp.queues.consume_from)
  208. c.CELERY_CREATE_MISSING_QUEUES = False
  209. del(app.amqp.queues)
  210. with self.assertRaises(ImproperlyConfigured):
  211. self.Worker(app=self.app).setup_queues(['image'])
  212. del(app.amqp.queues)
  213. c.CELERY_CREATE_MISSING_QUEUES = True
  214. worker = self.Worker(app=self.app)
  215. worker.setup_queues(queues=['image'])
  216. self.assertIn('image', app.amqp.queues.consume_from)
  217. self.assertEqual(Queue('image', Exchange('image'),
  218. routing_key='image'), app.amqp.queues['image'])
  219. finally:
  220. app.amqp.queues = p
  221. @disable_stdouts
  222. def test_autoscale_argument(self):
  223. worker1 = self.Worker(app=self.app, autoscale='10,3')
  224. self.assertListEqual(worker1.autoscale, [10, 3])
  225. worker2 = self.Worker(app=self.app, autoscale='10')
  226. self.assertListEqual(worker2.autoscale, [10, 0])
  227. def test_include_argument(self):
  228. worker1 = self.Worker(app=self.app, include='some.module')
  229. self.assertListEqual(worker1.include, ['some.module'])
  230. worker2 = self.Worker(app=self.app,
  231. include='some.module,another.package')
  232. self.assertListEqual(
  233. worker2.include,
  234. ['some.module', 'another.package'],
  235. )
  236. self.Worker(app=self.app, include=['os', 'sys'])
  237. @disable_stdouts
  238. def test_unknown_loglevel(self):
  239. with self.assertRaises(SystemExit):
  240. worker(app=self.app).run(loglevel='ALIEN')
  241. worker1 = self.Worker(app=self.app, loglevel=0xFFFF)
  242. self.assertEqual(worker1.loglevel, 0xFFFF)
  243. @disable_stdouts
  244. def test_warns_if_running_as_privileged_user(self):
  245. app = self.app
  246. if app.IS_WINDOWS:
  247. raise SkipTest('Not applicable on Windows')
  248. def getuid():
  249. return 0
  250. prev, os.getuid = os.getuid, getuid
  251. try:
  252. with self.assertWarnsRegex(
  253. RuntimeWarning,
  254. r'superuser privileges is discouraged'):
  255. worker = self.Worker(app=self.app)
  256. worker.on_start()
  257. finally:
  258. os.getuid = prev
  259. @disable_stdouts
  260. def test_redirect_stdouts(self):
  261. self.Worker(app=self.app, redirect_stdouts=False)
  262. with self.assertRaises(AttributeError):
  263. sys.stdout.logger
  264. @disable_stdouts
  265. def test_on_start_custom_logging(self):
  266. prev, self.app.log.redirect_stdouts = (
  267. self.app.log.redirect_stdouts, Mock(),
  268. )
  269. try:
  270. worker = self.Worker(app=self.app, redirect_stoutds=True)
  271. worker._custom_logging = True
  272. worker.on_start()
  273. self.assertFalse(self.app.log.redirect_stdouts.called)
  274. finally:
  275. self.app.log.redirect_stdouts = prev
  276. def test_setup_logging_no_color(self):
  277. worker = self.Worker(
  278. app=self.app, redirect_stdouts=False, no_color=True,
  279. )
  280. prev, self.app.log.setup = self.app.log.setup, Mock()
  281. worker.setup_logging()
  282. self.assertFalse(self.app.log.setup.call_args[1]['colorize'])
  283. @disable_stdouts
  284. def test_startup_info_pool_is_str(self):
  285. worker = self.Worker(app=self.app, redirect_stdouts=False)
  286. worker.pool_cls = 'foo'
  287. worker.startup_info()
  288. def test_redirect_stdouts_already_handled(self):
  289. logging_setup = [False]
  290. @signals.setup_logging.connect
  291. def on_logging_setup(**kwargs):
  292. logging_setup[0] = True
  293. try:
  294. worker = self.Worker(app=self.app, redirect_stdouts=False)
  295. worker.app.log.already_setup = False
  296. worker.setup_logging()
  297. self.assertTrue(logging_setup[0])
  298. with self.assertRaises(AttributeError):
  299. sys.stdout.logger
  300. finally:
  301. signals.setup_logging.disconnect(on_logging_setup)
  302. @disable_stdouts
  303. def test_platform_tweaks_osx(self):
  304. class OSXWorker(Worker):
  305. proxy_workaround_installed = False
  306. def osx_proxy_detection_workaround(self):
  307. self.proxy_workaround_installed = True
  308. worker = OSXWorker(app=self.app, redirect_stdouts=False)
  309. def install_HUP_nosupport(controller):
  310. controller.hup_not_supported_installed = True
  311. class Controller(object):
  312. pass
  313. prev = cd.install_HUP_not_supported_handler
  314. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  315. try:
  316. worker.app.IS_OSX = True
  317. controller = Controller()
  318. worker.install_platform_tweaks(controller)
  319. self.assertTrue(controller.hup_not_supported_installed)
  320. self.assertTrue(worker.proxy_workaround_installed)
  321. finally:
  322. cd.install_HUP_not_supported_handler = prev
  323. @disable_stdouts
  324. def test_general_platform_tweaks(self):
  325. restart_worker_handler_installed = [False]
  326. def install_worker_restart_handler(worker):
  327. restart_worker_handler_installed[0] = True
  328. class Controller(object):
  329. pass
  330. prev = cd.install_worker_restart_handler
  331. cd.install_worker_restart_handler = install_worker_restart_handler
  332. try:
  333. worker = self.Worker(app=self.app)
  334. worker.app.IS_OSX = False
  335. worker.install_platform_tweaks(Controller())
  336. self.assertTrue(restart_worker_handler_installed[0])
  337. finally:
  338. cd.install_worker_restart_handler = prev
  339. @disable_stdouts
  340. def test_on_consumer_ready(self):
  341. worker_ready_sent = [False]
  342. @signals.worker_ready.connect
  343. def on_worker_ready(**kwargs):
  344. worker_ready_sent[0] = True
  345. self.Worker(app=self.app).on_consumer_ready(object())
  346. self.assertTrue(worker_ready_sent[0])
  347. class test_funs(WorkerAppCase):
  348. def test_active_thread_count(self):
  349. self.assertTrue(cd.active_thread_count())
  350. @disable_stdouts
  351. def test_set_process_status(self):
  352. try:
  353. __import__('setproctitle')
  354. except ImportError:
  355. raise SkipTest('setproctitle not installed')
  356. worker = Worker(app=self.app, hostname='xyzza')
  357. prev1, sys.argv = sys.argv, ['Arg0']
  358. try:
  359. st = worker.set_process_status('Running')
  360. self.assertIn('celeryd', st)
  361. self.assertIn('xyzza', st)
  362. self.assertIn('Running', st)
  363. prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
  364. try:
  365. st = worker.set_process_status('Running')
  366. self.assertIn('celeryd', st)
  367. self.assertIn('xyzza', st)
  368. self.assertIn('Running', st)
  369. self.assertIn('Arg1', st)
  370. finally:
  371. sys.argv = prev2
  372. finally:
  373. sys.argv = prev1
  374. @disable_stdouts
  375. def test_parse_options(self):
  376. cmd = worker()
  377. cmd.app = self.app
  378. opts, args = cmd.parse_options('worker', ['--concurrency=512'])
  379. self.assertEqual(opts.concurrency, 512)
  380. @disable_stdouts
  381. def test_main(self):
  382. p, cd.Worker = cd.Worker, Worker
  383. s, sys.argv = sys.argv, ['worker', '--discard']
  384. try:
  385. worker_main()
  386. finally:
  387. cd.Worker = p
  388. sys.argv = s
  389. class test_signal_handlers(WorkerAppCase):
  390. class _Worker(object):
  391. stopped = False
  392. terminated = False
  393. def stop(self, in_sighandler=False):
  394. self.stopped = True
  395. def terminate(self, in_sighandler=False):
  396. self.terminated = True
  397. def psig(self, fun, *args, **kwargs):
  398. handlers = {}
  399. class Signals(platforms.Signals):
  400. def __setitem__(self, sig, handler):
  401. handlers[sig] = handler
  402. p, platforms.signals = platforms.signals, Signals()
  403. try:
  404. fun(*args, **kwargs)
  405. return handlers
  406. finally:
  407. platforms.signals = p
  408. @disable_stdouts
  409. def test_worker_int_handler(self):
  410. worker = self._Worker()
  411. handlers = self.psig(cd.install_worker_int_handler, worker)
  412. next_handlers = {}
  413. state.should_stop = False
  414. state.should_terminate = False
  415. class Signals(platforms.Signals):
  416. def __setitem__(self, sig, handler):
  417. next_handlers[sig] = handler
  418. with patch('celery.apps.worker.active_thread_count') as c:
  419. c.return_value = 3
  420. p, platforms.signals = platforms.signals, Signals()
  421. try:
  422. handlers['SIGINT']('SIGINT', object())
  423. self.assertTrue(state.should_stop)
  424. finally:
  425. platforms.signals = p
  426. state.should_stop = False
  427. try:
  428. next_handlers['SIGINT']('SIGINT', object())
  429. self.assertTrue(state.should_terminate)
  430. finally:
  431. state.should_terminate = False
  432. with patch('celery.apps.worker.active_thread_count') as c:
  433. c.return_value = 1
  434. p, platforms.signals = platforms.signals, Signals()
  435. try:
  436. with self.assertRaises(SystemExit):
  437. handlers['SIGINT']('SIGINT', object())
  438. finally:
  439. platforms.signals = p
  440. with self.assertRaises(SystemTerminate):
  441. next_handlers['SIGINT']('SIGINT', object())
  442. @disable_stdouts
  443. def test_worker_int_handler_only_stop_MainProcess(self):
  444. try:
  445. import _multiprocessing # noqa
  446. except ImportError:
  447. raise SkipTest('only relevant for multiprocessing')
  448. process = current_process()
  449. name, process.name = process.name, 'OtherProcess'
  450. with patch('celery.apps.worker.active_thread_count') as c:
  451. c.return_value = 3
  452. try:
  453. worker = self._Worker()
  454. handlers = self.psig(cd.install_worker_int_handler, worker)
  455. handlers['SIGINT']('SIGINT', object())
  456. self.assertTrue(state.should_stop)
  457. finally:
  458. process.name = name
  459. state.should_stop = False
  460. with patch('celery.apps.worker.active_thread_count') as c:
  461. c.return_value = 1
  462. try:
  463. worker = self._Worker()
  464. handlers = self.psig(cd.install_worker_int_handler, worker)
  465. with self.assertRaises(SystemExit):
  466. handlers['SIGINT']('SIGINT', object())
  467. finally:
  468. process.name = name
  469. state.should_stop = False
  470. @disable_stdouts
  471. def test_install_HUP_not_supported_handler(self):
  472. worker = self._Worker()
  473. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  474. handlers['SIGHUP']('SIGHUP', object())
  475. @disable_stdouts
  476. def test_worker_term_hard_handler_only_stop_MainProcess(self):
  477. try:
  478. import _multiprocessing # noqa
  479. except ImportError:
  480. raise SkipTest('only relevant for multiprocessing')
  481. process = current_process()
  482. name, process.name = process.name, 'OtherProcess'
  483. try:
  484. with patch('celery.apps.worker.active_thread_count') as c:
  485. c.return_value = 3
  486. worker = self._Worker()
  487. handlers = self.psig(
  488. cd.install_worker_term_hard_handler, worker)
  489. try:
  490. handlers['SIGQUIT']('SIGQUIT', object())
  491. self.assertTrue(state.should_terminate)
  492. finally:
  493. state.should_terminate = False
  494. with patch('celery.apps.worker.active_thread_count') as c:
  495. c.return_value = 1
  496. worker = self._Worker()
  497. handlers = self.psig(
  498. cd.install_worker_term_hard_handler, worker)
  499. with self.assertRaises(SystemTerminate):
  500. handlers['SIGQUIT']('SIGQUIT', object())
  501. finally:
  502. process.name = name
  503. @disable_stdouts
  504. def test_worker_term_handler_when_threads(self):
  505. with patch('celery.apps.worker.active_thread_count') as c:
  506. c.return_value = 3
  507. worker = self._Worker()
  508. handlers = self.psig(cd.install_worker_term_handler, worker)
  509. try:
  510. handlers['SIGTERM']('SIGTERM', object())
  511. self.assertTrue(state.should_stop)
  512. finally:
  513. state.should_stop = False
  514. @disable_stdouts
  515. def test_worker_term_handler_when_single_thread(self):
  516. with patch('celery.apps.worker.active_thread_count') as c:
  517. c.return_value = 1
  518. worker = self._Worker()
  519. handlers = self.psig(cd.install_worker_term_handler, worker)
  520. try:
  521. with self.assertRaises(SystemExit):
  522. handlers['SIGTERM']('SIGTERM', object())
  523. finally:
  524. state.should_stop = False
  525. @patch('sys.__stderr__')
  526. @skip_if_pypy
  527. @skip_if_jython
  528. def test_worker_cry_handler(self, stderr):
  529. handlers = self.psig(cd.install_cry_handler)
  530. self.assertIsNone(handlers['SIGUSR1']('SIGUSR1', object()))
  531. self.assertTrue(stderr.write.called)
  532. @disable_stdouts
  533. def test_worker_term_handler_only_stop_MainProcess(self):
  534. try:
  535. import _multiprocessing # noqa
  536. except ImportError:
  537. raise SkipTest('only relevant for multiprocessing')
  538. process = current_process()
  539. name, process.name = process.name, 'OtherProcess'
  540. try:
  541. with patch('celery.apps.worker.active_thread_count') as c:
  542. c.return_value = 3
  543. worker = self._Worker()
  544. handlers = self.psig(cd.install_worker_term_handler, worker)
  545. handlers['SIGTERM']('SIGTERM', object())
  546. self.assertTrue(state.should_stop)
  547. with patch('celery.apps.worker.active_thread_count') as c:
  548. c.return_value = 1
  549. worker = self._Worker()
  550. handlers = self.psig(cd.install_worker_term_handler, worker)
  551. with self.assertRaises(SystemExit):
  552. handlers['SIGTERM']('SIGTERM', object())
  553. finally:
  554. process.name = name
  555. state.should_stop = False
  556. @disable_stdouts
  557. @patch('atexit.register')
  558. @patch('os.fork')
  559. @patch('os.close')
  560. def test_worker_restart_handler(self, _close, fork, register):
  561. fork.return_value = 0
  562. if getattr(os, 'execv', None) is None:
  563. raise SkipTest('platform does not have excv')
  564. argv = []
  565. def _execv(*args):
  566. argv.extend(args)
  567. execv, os.execv = os.execv, _execv
  568. try:
  569. worker = self._Worker()
  570. handlers = self.psig(cd.install_worker_restart_handler, worker)
  571. handlers['SIGHUP']('SIGHUP', object())
  572. self.assertTrue(state.should_stop)
  573. self.assertTrue(register.called)
  574. callback = register.call_args[0][0]
  575. callback()
  576. self.assertTrue(argv)
  577. argv[:] = []
  578. fork.return_value = 1
  579. callback()
  580. self.assertFalse(argv)
  581. finally:
  582. os.execv = execv
  583. state.should_stop = False
  584. @disable_stdouts
  585. def test_worker_term_hard_handler_when_threaded(self):
  586. with patch('celery.apps.worker.active_thread_count') as c:
  587. c.return_value = 3
  588. worker = self._Worker()
  589. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  590. try:
  591. handlers['SIGQUIT']('SIGQUIT', object())
  592. self.assertTrue(state.should_terminate)
  593. finally:
  594. state.should_terminate = False
  595. @disable_stdouts
  596. def test_worker_term_hard_handler_when_single_threaded(self):
  597. with patch('celery.apps.worker.active_thread_count') as c:
  598. c.return_value = 1
  599. worker = self._Worker()
  600. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  601. with self.assertRaises(SystemTerminate):
  602. handlers['SIGQUIT']('SIGQUIT', object())