test_worker.py 23 KB


  1. from __future__ import absolute_import, unicode_literals
  2. import logging
  3. import os
  4. import pytest
  5. import sys
  6. from billiard.process import current_process
  7. from case import Mock, mock, patch, skip
  8. from kombu import Exchange, Queue
  9. from celery import platforms
  10. from celery import signals
  11. from celery.app import trace
  12. from celery.apps import worker as cd
  13. from celery.bin.worker import worker, main as worker_main
  14. from celery.exceptions import (
  15. ImproperlyConfigured, WorkerShutdown, WorkerTerminate,
  16. )
  17. from celery.platforms import EX_FAILURE, EX_OK
  18. from celery.worker import state
  19. @pytest.fixture(autouse=True)
  20. def reset_worker_optimizations():
  21. yield
  22. trace.reset_worker_optimizations()
  23. class Worker(cd.Worker):
  24. redirect_stdouts = False
  25. def start(self, *args, **kwargs):
  26. self.on_start()
  27. class test_Worker:
  28. Worker = Worker
  29. def test_queues_string(self):
  30. with mock.stdouts():
  31. w = self.app.Worker()
  32. w.setup_queues('foo,bar,baz')
  33. assert 'foo' in self.app.amqp.queues
  34. def test_cpu_count(self):
  35. with mock.stdouts():
  36. with patch('celery.worker.worker.cpu_count') as cpu_count:
  37. cpu_count.side_effect = NotImplementedError()
  38. w = self.app.Worker(concurrency=None)
  39. assert w.concurrency == 2
  40. w = self.app.Worker(concurrency=5)
  41. assert w.concurrency == 5
  42. def test_windows_B_option(self):
  43. with mock.stdouts():
  44. self.app.IS_WINDOWS = True
  45. with pytest.raises(SystemExit):
  46. worker(app=self.app).run(beat=True)
  47. def test_setup_concurrency_very_early(self):
  48. x = worker()
  49. x.run = Mock()
  50. with pytest.raises(ImportError):
  51. x.execute_from_commandline(['worker', '-P', 'xyzybox'])
  52. def test_run_from_argv_basic(self):
  53. x = worker(app=self.app)
  54. x.run = Mock()
  55. x.maybe_detach = Mock()
  56. def run(*args, **kwargs):
  57. pass
  58. x.run = run
  59. x.run_from_argv('celery', [])
  60. x.maybe_detach.assert_called()
  61. def test_maybe_detach(self):
  62. x = worker(app=self.app)
  63. with patch('celery.bin.worker.detached_celeryd') as detached:
  64. x.maybe_detach([])
  65. detached.assert_not_called()
  66. with pytest.raises(SystemExit):
  67. x.maybe_detach(['--detach'])
  68. detached.assert_called()
  69. def test_invalid_loglevel_gives_error(self):
  70. with mock.stdouts():
  71. x = worker(app=self.app)
  72. with pytest.raises(SystemExit):
  73. x.run(loglevel='GRIM_REAPER')
  74. def test_no_loglevel(self):
  75. self.app.Worker = Mock()
  76. worker(app=self.app).run(loglevel=None)
  77. def test_tasklist(self):
  78. worker = self.app.Worker()
  79. assert worker.app.tasks
  80. assert worker.app.finalized
  81. assert worker.tasklist(include_builtins=True)
  82. worker.tasklist(include_builtins=False)
  83. def test_extra_info(self):
  84. worker = self.app.Worker()
  85. worker.loglevel = logging.WARNING
  86. assert not worker.extra_info()
  87. worker.loglevel = logging.INFO
  88. assert worker.extra_info()
  89. def test_loglevel_string(self):
  90. with mock.stdouts():
  91. worker = self.Worker(app=self.app, loglevel='INFO')
  92. assert worker.loglevel == logging.INFO
  93. def test_run_worker(self, patching):
  94. handlers = {}
  95. class Signals(platforms.Signals):
  96. def __setitem__(self, sig, handler):
  97. handlers[sig] = handler
  98. patching.setattr('celery.platforms.signals', Signals())
  99. with mock.stdouts():
  100. w = self.Worker(app=self.app)
  101. w._isatty = False
  102. w.on_start()
  103. for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
  104. assert sig in handlers
  105. handlers.clear()
  106. w = self.Worker(app=self.app)
  107. w._isatty = True
  108. w.on_start()
  109. for sig in 'SIGINT', 'SIGTERM':
  110. assert sig in handlers
  111. assert 'SIGHUP' not in handlers
  112. def test_startup_info(self):
  113. with mock.stdouts():
  114. worker = self.Worker(app=self.app)
  115. worker.on_start()
  116. assert worker.startup_info()
  117. worker.loglevel = logging.DEBUG
  118. assert worker.startup_info()
  119. worker.loglevel = logging.INFO
  120. assert worker.startup_info()
  121. worker.autoscale = 13, 10
  122. assert worker.startup_info()
  123. prev_loader = self.app.loader
  124. worker = self.Worker(
  125. app=self.app,
  126. queues='foo,bar,baz,xuzzy,do,re,mi',
  127. )
  128. with patch('celery.apps.worker.qualname') as qualname:
  129. qualname.return_value = 'acme.backed_beans.Loader'
  130. assert worker.startup_info()
  131. with patch('celery.apps.worker.qualname') as qualname:
  132. qualname.return_value = 'celery.loaders.Loader'
  133. assert worker.startup_info()
  134. from celery.loaders.app import AppLoader
  135. self.app.loader = AppLoader(app=self.app)
  136. assert worker.startup_info()
  137. self.app.loader = prev_loader
  138. worker.task_events = True
  139. assert worker.startup_info()
  140. # test when there are too few output lines
  141. # to draft the ascii art onto
  142. prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
  143. try:
  144. assert worker.startup_info()
  145. finally:
  146. cd.ARTLINES = prev
  147. def test_run(self):
  148. with mock.stdouts():
  149. self.Worker(app=self.app).on_start()
  150. self.Worker(app=self.app, purge=True).on_start()
  151. worker = self.Worker(app=self.app)
  152. worker.on_start()
  153. def test_purge_messages(self):
  154. with mock.stdouts():
  155. self.Worker(app=self.app).purge_messages()
  156. def test_init_queues(self):
  157. with mock.stdouts():
  158. app = self.app
  159. c = app.conf
  160. app.amqp.queues = app.amqp.Queues({
  161. 'celery': {
  162. 'exchange': 'celery',
  163. 'routing_key': 'celery',
  164. },
  165. 'video': {
  166. 'exchange': 'video',
  167. 'routing_key': 'video',
  168. },
  169. })
  170. worker = self.Worker(app=self.app)
  171. worker.setup_queues(['video'])
  172. assert 'video' in app.amqp.queues
  173. assert 'video' in app.amqp.queues.consume_from
  174. assert 'celery' in app.amqp.queues
  175. assert 'celery' not in app.amqp.queues.consume_from
  176. c.task_create_missing_queues = False
  177. del(app.amqp.queues)
  178. with pytest.raises(ImproperlyConfigured):
  179. self.Worker(app=self.app).setup_queues(['image'])
  180. del(app.amqp.queues)
  181. c.task_create_missing_queues = True
  182. worker = self.Worker(app=self.app)
  183. worker.setup_queues(['image'])
  184. assert 'image' in app.amqp.queues.consume_from
  185. assert app.amqp.queues['image'] == Queue(
  186. 'image', Exchange('image'),
  187. routing_key='image',
  188. )
  189. def test_autoscale_argument(self):
  190. with mock.stdouts():
  191. worker1 = self.Worker(app=self.app, autoscale='10,3')
  192. assert worker1.autoscale == [10, 3]
  193. worker2 = self.Worker(app=self.app, autoscale='10')
  194. assert worker2.autoscale == [10, 0]
  195. def test_include_argument(self):
  196. worker1 = self.Worker(app=self.app, include='os')
  197. assert worker1.include == ['os']
  198. worker2 = self.Worker(app=self.app,
  199. include='os,sys')
  200. assert worker2.include == ['os', 'sys']
  201. self.Worker(app=self.app, include=['os', 'sys'])
  202. def test_unknown_loglevel(self):
  203. with mock.stdouts():
  204. with pytest.raises(SystemExit):
  205. worker(app=self.app).run(loglevel='ALIEN')
  206. worker1 = self.Worker(app=self.app, loglevel=0xFFFF)
  207. assert worker1.loglevel == 0xFFFF
  208. @patch('os._exit')
  209. @skip.if_win32()
  210. def test_warns_if_running_as_privileged_user(self, _exit, patching):
  211. getuid = patching('os.getuid')
  212. with mock.stdouts() as (_, stderr):
  213. getuid.return_value = 0
  214. self.app.conf.accept_content = ['pickle']
  215. worker = self.Worker(app=self.app)
  216. worker.on_start()
  217. _exit.assert_called_with(1)
  218. patching.setattr('celery.platforms.C_FORCE_ROOT', True)
  219. worker = self.Worker(app=self.app)
  220. worker.on_start()
  221. assert 'a very bad idea' in stderr.getvalue()
  222. patching.setattr('celery.platforms.C_FORCE_ROOT', False)
  223. self.app.conf.accept_content = ['json']
  224. worker = self.Worker(app=self.app)
  225. worker.on_start()
  226. assert 'superuser' in stderr.getvalue()
  227. def test_redirect_stdouts(self):
  228. with mock.stdouts():
  229. self.Worker(app=self.app, redirect_stdouts=False)
  230. with pytest.raises(AttributeError):
  231. sys.stdout.logger
  232. def test_on_start_custom_logging(self):
  233. with mock.stdouts():
  234. self.app.log.redirect_stdouts = Mock()
  235. worker = self.Worker(app=self.app, redirect_stoutds=True)
  236. worker._custom_logging = True
  237. worker.on_start()
  238. self.app.log.redirect_stdouts.assert_not_called()
  239. def test_setup_logging_no_color(self):
  240. worker = self.Worker(
  241. app=self.app, redirect_stdouts=False, no_color=True,
  242. )
  243. prev, self.app.log.setup = self.app.log.setup, Mock()
  244. try:
  245. worker.setup_logging()
  246. assert not self.app.log.setup.call_args[1]['colorize']
  247. finally:
  248. self.app.log.setup = prev
  249. def test_startup_info_pool_is_str(self):
  250. with mock.stdouts():
  251. worker = self.Worker(app=self.app, redirect_stdouts=False)
  252. worker.pool_cls = 'foo'
  253. worker.startup_info()
  254. def test_redirect_stdouts_already_handled(self):
  255. logging_setup = [False]
  256. @signals.setup_logging.connect
  257. def on_logging_setup(**kwargs):
  258. logging_setup[0] = True
  259. try:
  260. worker = self.Worker(app=self.app, redirect_stdouts=False)
  261. worker.app.log.already_setup = False
  262. worker.setup_logging()
  263. assert logging_setup[0]
  264. with pytest.raises(AttributeError):
  265. sys.stdout.logger
  266. finally:
  267. signals.setup_logging.disconnect(on_logging_setup)
  268. def test_platform_tweaks_macOS(self):
  269. class macOSWorker(Worker):
  270. proxy_workaround_installed = False
  271. def macOS_proxy_detection_workaround(self):
  272. self.proxy_workaround_installed = True
  273. with mock.stdouts():
  274. worker = macOSWorker(app=self.app, redirect_stdouts=False)
  275. def install_HUP_nosupport(controller):
  276. controller.hup_not_supported_installed = True
  277. class Controller(object):
  278. pass
  279. prev = cd.install_HUP_not_supported_handler
  280. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  281. try:
  282. worker.app.IS_macOS = True
  283. controller = Controller()
  284. worker.install_platform_tweaks(controller)
  285. assert controller.hup_not_supported_installed
  286. assert worker.proxy_workaround_installed
  287. finally:
  288. cd.install_HUP_not_supported_handler = prev
  289. def test_general_platform_tweaks(self):
  290. restart_worker_handler_installed = [False]
  291. def install_worker_restart_handler(worker):
  292. restart_worker_handler_installed[0] = True
  293. class Controller(object):
  294. pass
  295. with mock.stdouts():
  296. prev = cd.install_worker_restart_handler
  297. cd.install_worker_restart_handler = install_worker_restart_handler
  298. try:
  299. worker = self.Worker(app=self.app)
  300. worker.app.IS_macOS = False
  301. worker.install_platform_tweaks(Controller())
  302. assert restart_worker_handler_installed[0]
  303. finally:
  304. cd.install_worker_restart_handler = prev
  305. def test_on_consumer_ready(self):
  306. worker_ready_sent = [False]
  307. @signals.worker_ready.connect
  308. def on_worker_ready(**kwargs):
  309. worker_ready_sent[0] = True
  310. with mock.stdouts():
  311. self.Worker(app=self.app).on_consumer_ready(object())
  312. assert worker_ready_sent[0]
  313. @mock.stdouts
  314. class test_funs:
  315. def test_active_thread_count(self):
  316. assert cd.active_thread_count()
  317. @skip.unless_module('setproctitle')
  318. def test_set_process_status(self):
  319. worker = Worker(app=self.app, hostname='xyzza')
  320. prev1, sys.argv = sys.argv, ['Arg0']
  321. try:
  322. st = worker.set_process_status('Running')
  323. assert 'celeryd' in st
  324. assert 'xyzza' in st
  325. assert 'Running' in st
  326. prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
  327. try:
  328. st = worker.set_process_status('Running')
  329. assert 'celeryd' in st
  330. assert 'xyzza' in st
  331. assert 'Running' in st
  332. assert 'Arg1' in st
  333. finally:
  334. sys.argv = prev2
  335. finally:
  336. sys.argv = prev1
  337. def test_parse_options(self):
  338. cmd = worker()
  339. cmd.app = self.app
  340. opts, args = cmd.parse_options('worker', ['--concurrency=512',
  341. '--heartbeat-interval=10'])
  342. assert opts['concurrency'] == 512
  343. assert opts['heartbeat_interval'] == 10
  344. def test_main(self):
  345. p, cd.Worker = cd.Worker, Worker
  346. s, sys.argv = sys.argv, ['worker', '--discard']
  347. try:
  348. worker_main(app=self.app)
  349. finally:
  350. cd.Worker = p
  351. sys.argv = s
  352. @mock.stdouts
  353. class test_signal_handlers:
  354. class _Worker(object):
  355. hostname = 'foo'
  356. stopped = False
  357. terminated = False
  358. def stop(self, in_sighandler=False):
  359. self.stopped = True
  360. def terminate(self, in_sighandler=False):
  361. self.terminated = True
  362. def psig(self, fun, *args, **kwargs):
  363. handlers = {}
  364. class Signals(platforms.Signals):
  365. def __setitem__(self, sig, handler):
  366. handlers[sig] = handler
  367. p, platforms.signals = platforms.signals, Signals()
  368. try:
  369. fun(*args, **kwargs)
  370. return handlers
  371. finally:
  372. platforms.signals = p
  373. def test_worker_int_handler(self):
  374. worker = self._Worker()
  375. handlers = self.psig(cd.install_worker_int_handler, worker)
  376. next_handlers = {}
  377. state.should_stop = None
  378. state.should_terminate = None
  379. class Signals(platforms.Signals):
  380. def __setitem__(self, sig, handler):
  381. next_handlers[sig] = handler
  382. with patch('celery.apps.worker.active_thread_count') as c:
  383. c.return_value = 3
  384. p, platforms.signals = platforms.signals, Signals()
  385. try:
  386. handlers['SIGINT']('SIGINT', object())
  387. assert state.should_stop
  388. assert state.should_stop == EX_FAILURE
  389. finally:
  390. platforms.signals = p
  391. state.should_stop = None
  392. try:
  393. next_handlers['SIGINT']('SIGINT', object())
  394. assert state.should_terminate
  395. assert state.should_terminate == EX_FAILURE
  396. finally:
  397. state.should_terminate = None
  398. with patch('celery.apps.worker.active_thread_count') as c:
  399. c.return_value = 1
  400. p, platforms.signals = platforms.signals, Signals()
  401. try:
  402. with pytest.raises(WorkerShutdown):
  403. handlers['SIGINT']('SIGINT', object())
  404. finally:
  405. platforms.signals = p
  406. with pytest.raises(WorkerTerminate):
  407. next_handlers['SIGINT']('SIGINT', object())
  408. @skip.unless_module('multiprocessing')
  409. def test_worker_int_handler_only_stop_MainProcess(self):
  410. process = current_process()
  411. name, process.name = process.name, 'OtherProcess'
  412. with patch('celery.apps.worker.active_thread_count') as c:
  413. c.return_value = 3
  414. try:
  415. worker = self._Worker()
  416. handlers = self.psig(cd.install_worker_int_handler, worker)
  417. handlers['SIGINT']('SIGINT', object())
  418. assert state.should_stop
  419. finally:
  420. process.name = name
  421. state.should_stop = None
  422. with patch('celery.apps.worker.active_thread_count') as c:
  423. c.return_value = 1
  424. try:
  425. worker = self._Worker()
  426. handlers = self.psig(cd.install_worker_int_handler, worker)
  427. with pytest.raises(WorkerShutdown):
  428. handlers['SIGINT']('SIGINT', object())
  429. finally:
  430. process.name = name
  431. state.should_stop = None
  432. def test_install_HUP_not_supported_handler(self):
  433. worker = self._Worker()
  434. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  435. handlers['SIGHUP']('SIGHUP', object())
  436. @skip.unless_module('multiprocessing')
  437. def test_worker_term_hard_handler_only_stop_MainProcess(self):
  438. process = current_process()
  439. name, process.name = process.name, 'OtherProcess'
  440. try:
  441. with patch('celery.apps.worker.active_thread_count') as c:
  442. c.return_value = 3
  443. worker = self._Worker()
  444. handlers = self.psig(
  445. cd.install_worker_term_hard_handler, worker)
  446. try:
  447. handlers['SIGQUIT']('SIGQUIT', object())
  448. assert state.should_terminate
  449. finally:
  450. state.should_terminate = None
  451. with patch('celery.apps.worker.active_thread_count') as c:
  452. c.return_value = 1
  453. worker = self._Worker()
  454. handlers = self.psig(
  455. cd.install_worker_term_hard_handler, worker)
  456. try:
  457. with pytest.raises(WorkerTerminate):
  458. handlers['SIGQUIT']('SIGQUIT', object())
  459. finally:
  460. state.should_terminate = None
  461. finally:
  462. process.name = name
  463. def test_worker_term_handler_when_threads(self):
  464. with patch('celery.apps.worker.active_thread_count') as c:
  465. c.return_value = 3
  466. worker = self._Worker()
  467. handlers = self.psig(cd.install_worker_term_handler, worker)
  468. try:
  469. handlers['SIGTERM']('SIGTERM', object())
  470. assert state.should_stop == EX_OK
  471. finally:
  472. state.should_stop = None
  473. def test_worker_term_handler_when_single_thread(self):
  474. with patch('celery.apps.worker.active_thread_count') as c:
  475. c.return_value = 1
  476. worker = self._Worker()
  477. handlers = self.psig(cd.install_worker_term_handler, worker)
  478. try:
  479. with pytest.raises(WorkerShutdown):
  480. handlers['SIGTERM']('SIGTERM', object())
  481. finally:
  482. state.should_stop = None
  483. @patch('sys.__stderr__')
  484. @skip.if_pypy()
  485. @skip.if_jython()
  486. def test_worker_cry_handler(self, stderr):
  487. handlers = self.psig(cd.install_cry_handler)
  488. assert handlers['SIGUSR1']('SIGUSR1', object()) is None
  489. stderr.write.assert_called()
  490. @skip.unless_module('multiprocessing')
  491. def test_worker_term_handler_only_stop_MainProcess(self):
  492. process = current_process()
  493. name, process.name = process.name, 'OtherProcess'
  494. try:
  495. with patch('celery.apps.worker.active_thread_count') as c:
  496. c.return_value = 3
  497. worker = self._Worker()
  498. handlers = self.psig(cd.install_worker_term_handler, worker)
  499. handlers['SIGTERM']('SIGTERM', object())
  500. assert state.should_stop == EX_OK
  501. with patch('celery.apps.worker.active_thread_count') as c:
  502. c.return_value = 1
  503. worker = self._Worker()
  504. handlers = self.psig(cd.install_worker_term_handler, worker)
  505. with pytest.raises(WorkerShutdown):
  506. handlers['SIGTERM']('SIGTERM', object())
  507. finally:
  508. process.name = name
  509. state.should_stop = None
  510. @skip.unless_symbol('os.execv')
  511. @patch('celery.platforms.close_open_fds')
  512. @patch('atexit.register')
  513. @patch('os.close')
  514. def test_worker_restart_handler(self, _close, register, close_open):
  515. argv = []
  516. def _execv(*args):
  517. argv.extend(args)
  518. execv, os.execv = os.execv, _execv
  519. try:
  520. worker = self._Worker()
  521. handlers = self.psig(cd.install_worker_restart_handler, worker)
  522. handlers['SIGHUP']('SIGHUP', object())
  523. assert state.should_stop == EX_OK
  524. register.assert_called()
  525. callback = register.call_args[0][0]
  526. callback()
  527. assert argv
  528. finally:
  529. os.execv = execv
  530. state.should_stop = None
  531. def test_worker_term_hard_handler_when_threaded(self):
  532. with patch('celery.apps.worker.active_thread_count') as c:
  533. c.return_value = 3
  534. worker = self._Worker()
  535. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  536. try:
  537. handlers['SIGQUIT']('SIGQUIT', object())
  538. assert state.should_terminate
  539. finally:
  540. state.should_terminate = None
  541. def test_worker_term_hard_handler_when_single_threaded(self):
  542. with patch('celery.apps.worker.active_thread_count') as c:
  543. c.return_value = 1
  544. worker = self._Worker()
  545. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  546. with pytest.raises(WorkerTerminate):
  547. handlers['SIGQUIT']('SIGQUIT', object())
  548. def test_send_worker_shutting_down_signal(self):
  549. with patch('celery.apps.worker.signals.worker_shutting_down') as wsd:
  550. worker = self._Worker()
  551. handlers = self.psig(cd.install_worker_term_handler, worker)
  552. try:
  553. with pytest.raises(WorkerShutdown):
  554. handlers['SIGTERM']('SIGTERM', object())
  555. finally:
  556. state.should_stop = None
  557. wsd.send.assert_called_with(
  558. sender='foo', sig='SIGTERM', how='Warm', exitcode=0,
  559. )