test_worker.py 22 KB


  1. import logging
  2. import os
  3. import pytest
  4. import sys
  5. from billiard.process import current_process
  6. from case import Mock, mock, patch, skip
  7. from kombu import Exchange, Queue
  8. from celery import platforms
  9. from celery import signals
  10. from celery.app import trace
  11. from celery.apps import worker as cd
  12. from celery.bin.worker import worker, main as worker_main
  13. from celery.exceptions import (
  14. ImproperlyConfigured, WorkerShutdown, WorkerTerminate,
  15. )
  16. from celery.platforms import EX_FAILURE, EX_OK
  17. from celery.worker import state
  18. @pytest.fixture(autouse=True)
  19. def reset_worker_optimizations():
  20. yield
  21. trace.reset_worker_optimizations()
  22. class Worker(cd.Worker):
  23. redirect_stdouts = False
  24. def start(self, *args, **kwargs):
  25. self.on_start()
  26. class test_Worker:
  27. Worker = Worker
  28. def test_queues_string(self):
  29. with mock.stdouts():
  30. w = self.app.Worker()
  31. w.setup_queues('foo,bar,baz')
  32. assert 'foo' in self.app.amqp.queues
  33. def test_cpu_count(self):
  34. with mock.stdouts():
  35. with patch('celery.worker.worker.cpu_count') as cpu_count:
  36. cpu_count.side_effect = NotImplementedError()
  37. w = self.app.Worker(concurrency=None)
  38. assert w.concurrency == 2
  39. w = self.app.Worker(concurrency=5)
  40. assert w.concurrency == 5
  41. def test_windows_B_option(self):
  42. with mock.stdouts():
  43. self.app.IS_WINDOWS = True
  44. with pytest.raises(SystemExit):
  45. worker(app=self.app).run(beat=True)
  46. def test_setup_concurrency_very_early(self):
  47. x = worker()
  48. x.run = Mock()
  49. with pytest.raises(ImportError):
  50. x.execute_from_commandline(['worker', '-P', 'xyzybox'])
  51. def test_run_from_argv_basic(self):
  52. x = worker(app=self.app)
  53. x.run = Mock()
  54. x.maybe_detach = Mock()
  55. def run(*args, **kwargs):
  56. pass
  57. x.run = run
  58. x.run_from_argv('celery', [])
  59. x.maybe_detach.assert_called()
  60. def test_maybe_detach(self):
  61. x = worker(app=self.app)
  62. with patch('celery.bin.worker.detached_celeryd') as detached:
  63. x.maybe_detach([])
  64. detached.assert_not_called()
  65. with pytest.raises(SystemExit):
  66. x.maybe_detach(['--detach'])
  67. detached.assert_called()
  68. def test_invalid_loglevel_gives_error(self):
  69. with mock.stdouts():
  70. x = worker(app=self.app)
  71. with pytest.raises(SystemExit):
  72. x.run(loglevel='GRIM_REAPER')
  73. def test_no_loglevel(self):
  74. self.app.Worker = Mock()
  75. worker(app=self.app).run(loglevel=None)
  76. def test_tasklist(self):
  77. worker = self.app.Worker()
  78. assert worker.app.tasks
  79. assert worker.app.finalized
  80. assert worker.tasklist(include_builtins=True)
  81. worker.tasklist(include_builtins=False)
  82. def test_extra_info(self):
  83. worker = self.app.Worker()
  84. worker.loglevel = logging.WARNING
  85. assert not worker.extra_info()
  86. worker.loglevel = logging.INFO
  87. assert worker.extra_info()
  88. def test_loglevel_string(self):
  89. with mock.stdouts():
  90. worker = self.Worker(app=self.app, loglevel='INFO')
  91. assert worker.loglevel == logging.INFO
  92. def test_run_worker(self, patching):
  93. handlers = {}
  94. class Signals(platforms.Signals):
  95. def __setitem__(self, sig, handler):
  96. handlers[sig] = handler
  97. patching.setattr('celery.platforms.signals', Signals())
  98. with mock.stdouts():
  99. w = self.Worker(app=self.app)
  100. w._isatty = False
  101. w.on_start()
  102. for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
  103. assert sig in handlers
  104. handlers.clear()
  105. w = self.Worker(app=self.app)
  106. w._isatty = True
  107. w.on_start()
  108. for sig in 'SIGINT', 'SIGTERM':
  109. assert sig in handlers
  110. assert 'SIGHUP' not in handlers
  111. def test_startup_info(self):
  112. with mock.stdouts():
  113. worker = self.Worker(app=self.app)
  114. worker.on_start()
  115. assert worker.startup_info()
  116. worker.loglevel = logging.DEBUG
  117. assert worker.startup_info()
  118. worker.loglevel = logging.INFO
  119. assert worker.startup_info()
  120. worker.autoscale = 13, 10
  121. assert worker.startup_info()
  122. prev_loader = self.app.loader
  123. worker = self.Worker(
  124. app=self.app,
  125. queues='foo,bar,baz,xuzzy,do,re,mi',
  126. )
  127. with patch('celery.apps.worker.qualname') as qualname:
  128. qualname.return_value = 'acme.backed_beans.Loader'
  129. assert worker.startup_info()
  130. with patch('celery.apps.worker.qualname') as qualname:
  131. qualname.return_value = 'celery.loaders.Loader'
  132. assert worker.startup_info()
  133. from celery.loaders.app import AppLoader
  134. self.app.loader = AppLoader(app=self.app)
  135. assert worker.startup_info()
  136. self.app.loader = prev_loader
  137. worker.task_events = True
  138. assert worker.startup_info()
  139. # test when there are too few output lines
  140. # to draft the ascii art onto
  141. prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
  142. try:
  143. assert worker.startup_info()
  144. finally:
  145. cd.ARTLINES = prev
  146. def test_run(self):
  147. with mock.stdouts():
  148. self.Worker(app=self.app).on_start()
  149. self.Worker(app=self.app, purge=True).on_start()
  150. worker = self.Worker(app=self.app)
  151. worker.on_start()
  152. def test_purge_messages(self):
  153. with mock.stdouts():
  154. self.Worker(app=self.app).purge_messages()
  155. def test_init_queues(self):
  156. with mock.stdouts():
  157. app = self.app
  158. c = app.conf
  159. app.amqp.queues = app.amqp.Queues({
  160. 'celery': {
  161. 'exchange': 'celery',
  162. 'routing_key': 'celery',
  163. },
  164. 'video': {
  165. 'exchange': 'video',
  166. 'routing_key': 'video',
  167. },
  168. })
  169. worker = self.Worker(app=self.app)
  170. worker.setup_queues(['video'])
  171. assert 'video' in app.amqp.queues
  172. assert 'video' in app.amqp.queues.consume_from
  173. assert 'celery' in app.amqp.queues
  174. assert 'celery' not in app.amqp.queues.consume_from
  175. c.task_create_missing_queues = False
  176. del(app.amqp.queues)
  177. with pytest.raises(ImproperlyConfigured):
  178. self.Worker(app=self.app).setup_queues(['image'])
  179. del(app.amqp.queues)
  180. c.task_create_missing_queues = True
  181. worker = self.Worker(app=self.app)
  182. worker.setup_queues(['image'])
  183. assert 'image' in app.amqp.queues.consume_from
  184. assert app.amqp.queues['image'] == Queue(
  185. 'image', Exchange('image'),
  186. routing_key='image',
  187. )
  188. def test_autoscale_argument(self):
  189. with mock.stdouts():
  190. worker1 = self.Worker(app=self.app, autoscale='10,3')
  191. assert worker1.autoscale == [10, 3]
  192. worker2 = self.Worker(app=self.app, autoscale='10')
  193. assert worker2.autoscale == [10, 0]
  194. def test_include_argument(self):
  195. worker1 = self.Worker(app=self.app, include='os')
  196. assert worker1.include == ['os']
  197. worker2 = self.Worker(app=self.app,
  198. include='os,sys')
  199. assert worker2.include == ['os', 'sys']
  200. self.Worker(app=self.app, include=['os', 'sys'])
  201. def test_unknown_loglevel(self):
  202. with mock.stdouts():
  203. with pytest.raises(SystemExit):
  204. worker(app=self.app).run(loglevel='ALIEN')
  205. worker1 = self.Worker(app=self.app, loglevel=0xFFFF)
  206. assert worker1.loglevel == 0xFFFF
  207. @patch('os._exit')
  208. @skip.if_win32()
  209. def test_warns_if_running_as_privileged_user(self, _exit, patching):
  210. getuid = patching('os.getuid')
  211. with mock.stdouts() as (_, stderr):
  212. getuid.return_value = 0
  213. self.app.conf.accept_content = ['pickle']
  214. worker = self.Worker(app=self.app)
  215. worker.on_start()
  216. _exit.assert_called_with(1)
  217. patching.setattr('celery.platforms.C_FORCE_ROOT', True)
  218. worker = self.Worker(app=self.app)
  219. worker.on_start()
  220. assert 'a very bad idea' in stderr.getvalue()
  221. patching.setattr('celery.platforms.C_FORCE_ROOT', False)
  222. self.app.conf.accept_content = ['json']
  223. worker = self.Worker(app=self.app)
  224. worker.on_start()
  225. assert 'superuser' in stderr.getvalue()
  226. def test_redirect_stdouts(self):
  227. with mock.stdouts():
  228. self.Worker(app=self.app, redirect_stdouts=False)
  229. with pytest.raises(AttributeError):
  230. sys.stdout.logger
  231. def test_on_start_custom_logging(self):
  232. with mock.stdouts():
  233. self.app.log.redirect_stdouts = Mock()
  234. worker = self.Worker(app=self.app, redirect_stoutds=True)
  235. worker._custom_logging = True
  236. worker.on_start()
  237. self.app.log.redirect_stdouts.assert_not_called()
  238. def test_setup_logging_no_color(self):
  239. worker = self.Worker(
  240. app=self.app, redirect_stdouts=False, no_color=True,
  241. )
  242. prev, self.app.log.setup = self.app.log.setup, Mock()
  243. try:
  244. worker.setup_logging()
  245. assert not self.app.log.setup.call_args[1]['colorize']
  246. finally:
  247. self.app.log.setup = prev
  248. def test_startup_info_pool_is_str(self):
  249. with mock.stdouts():
  250. worker = self.Worker(app=self.app, redirect_stdouts=False)
  251. worker.pool_cls = 'foo'
  252. worker.startup_info()
  253. def test_redirect_stdouts_already_handled(self):
  254. logging_setup = [False]
  255. @signals.setup_logging.connect
  256. def on_logging_setup(**kwargs):
  257. logging_setup[0] = True
  258. try:
  259. worker = self.Worker(app=self.app, redirect_stdouts=False)
  260. worker.app.log.already_setup = False
  261. worker.setup_logging()
  262. assert logging_setup[0]
  263. with pytest.raises(AttributeError):
  264. sys.stdout.logger
  265. finally:
  266. signals.setup_logging.disconnect(on_logging_setup)
  267. def test_platform_tweaks_macOS(self):
  268. class macOSWorker(Worker):
  269. proxy_workaround_installed = False
  270. def macOS_proxy_detection_workaround(self):
  271. self.proxy_workaround_installed = True
  272. with mock.stdouts():
  273. worker = macOSWorker(app=self.app, redirect_stdouts=False)
  274. def install_HUP_nosupport(controller):
  275. controller.hup_not_supported_installed = True
  276. class Controller:
  277. ...
  278. prev = cd.install_HUP_not_supported_handler
  279. cd.install_HUP_not_supported_handler = install_HUP_nosupport
  280. try:
  281. worker.app.IS_macOS = True
  282. controller = Controller()
  283. worker.install_platform_tweaks(controller)
  284. assert controller.hup_not_supported_installed
  285. assert worker.proxy_workaround_installed
  286. finally:
  287. cd.install_HUP_not_supported_handler = prev
  288. def test_general_platform_tweaks(self):
  289. restart_worker_handler_installed = [False]
  290. def install_worker_restart_handler(worker):
  291. restart_worker_handler_installed[0] = True
  292. class Controller:
  293. pass
  294. with mock.stdouts():
  295. prev = cd.install_worker_restart_handler
  296. cd.install_worker_restart_handler = install_worker_restart_handler
  297. try:
  298. worker = self.Worker(app=self.app)
  299. worker.app.IS_macOS = False
  300. worker.install_platform_tweaks(Controller())
  301. assert restart_worker_handler_installed[0]
  302. finally:
  303. cd.install_worker_restart_handler = prev
  304. def test_on_consumer_ready(self):
  305. worker_ready_sent = [False]
  306. @signals.worker_ready.connect
  307. def on_worker_ready(**kwargs):
  308. worker_ready_sent[0] = True
  309. with mock.stdouts():
  310. self.Worker(app=self.app).on_consumer_ready(object())
  311. assert worker_ready_sent[0]
  312. @mock.stdouts
  313. class test_funs:
  314. def test_active_thread_count(self):
  315. assert cd.active_thread_count()
  316. @skip.unless_module('setproctitle')
  317. def test_set_process_status(self):
  318. worker = Worker(app=self.app, hostname='xyzza')
  319. prev1, sys.argv = sys.argv, ['Arg0']
  320. try:
  321. st = worker.set_process_status('Running')
  322. assert 'celeryd' in st
  323. assert 'xyzza' in st
  324. assert 'Running' in st
  325. prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
  326. try:
  327. st = worker.set_process_status('Running')
  328. assert 'celeryd' in st
  329. assert 'xyzza' in st
  330. assert 'Running' in st
  331. assert 'Arg1' in st
  332. finally:
  333. sys.argv = prev2
  334. finally:
  335. sys.argv = prev1
  336. def test_parse_options(self):
  337. cmd = worker()
  338. cmd.app = self.app
  339. opts, args = cmd.parse_options('worker', ['--concurrency=512',
  340. '--heartbeat-interval=10'])
  341. assert opts['concurrency'] == 512
  342. assert opts['heartbeat_interval'] == 10
  343. def test_main(self):
  344. p, cd.Worker = cd.Worker, Worker
  345. s, sys.argv = sys.argv, ['worker', '--discard']
  346. try:
  347. worker_main(app=self.app)
  348. finally:
  349. cd.Worker = p
  350. sys.argv = s
  351. @mock.stdouts
  352. class test_signal_handlers:
  353. class _Worker:
  354. stopped = False
  355. terminated = False
  356. def stop(self, in_sighandler=False):
  357. self.stopped = True
  358. def terminate(self, in_sighandler=False):
  359. self.terminated = True
  360. def psig(self, fun, *args, **kwargs):
  361. handlers = {}
  362. class Signals(platforms.Signals):
  363. def __setitem__(self, sig, handler):
  364. handlers[sig] = handler
  365. p, platforms.signals = platforms.signals, Signals()
  366. try:
  367. fun(*args, **kwargs)
  368. return handlers
  369. finally:
  370. platforms.signals = p
  371. def test_worker_int_handler(self):
  372. worker = self._Worker()
  373. handlers = self.psig(cd.install_worker_int_handler, worker)
  374. next_handlers = {}
  375. state.should_stop = None
  376. state.should_terminate = None
  377. class Signals(platforms.Signals):
  378. def __setitem__(self, sig, handler):
  379. next_handlers[sig] = handler
  380. with patch('celery.apps.worker.active_thread_count') as c:
  381. c.return_value = 3
  382. p, platforms.signals = platforms.signals, Signals()
  383. try:
  384. handlers['SIGINT']('SIGINT', object())
  385. assert state.should_stop
  386. assert state.should_stop == EX_FAILURE
  387. finally:
  388. platforms.signals = p
  389. state.should_stop = None
  390. try:
  391. next_handlers['SIGINT']('SIGINT', object())
  392. assert state.should_terminate
  393. assert state.should_terminate == EX_FAILURE
  394. finally:
  395. state.should_terminate = None
  396. with patch('celery.apps.worker.active_thread_count') as c:
  397. c.return_value = 1
  398. p, platforms.signals = platforms.signals, Signals()
  399. try:
  400. with pytest.raises(WorkerShutdown):
  401. handlers['SIGINT']('SIGINT', object())
  402. finally:
  403. platforms.signals = p
  404. with pytest.raises(WorkerTerminate):
  405. next_handlers['SIGINT']('SIGINT', object())
  406. @skip.unless_module('multiprocessing')
  407. def test_worker_int_handler_only_stop_MainProcess(self):
  408. process = current_process()
  409. name, process.name = process.name, 'OtherProcess'
  410. with patch('celery.apps.worker.active_thread_count') as c:
  411. c.return_value = 3
  412. try:
  413. worker = self._Worker()
  414. handlers = self.psig(cd.install_worker_int_handler, worker)
  415. handlers['SIGINT']('SIGINT', object())
  416. assert state.should_stop
  417. finally:
  418. process.name = name
  419. state.should_stop = None
  420. with patch('celery.apps.worker.active_thread_count') as c:
  421. c.return_value = 1
  422. try:
  423. worker = self._Worker()
  424. handlers = self.psig(cd.install_worker_int_handler, worker)
  425. with pytest.raises(WorkerShutdown):
  426. handlers['SIGINT']('SIGINT', object())
  427. finally:
  428. process.name = name
  429. state.should_stop = None
  430. def test_install_HUP_not_supported_handler(self):
  431. worker = self._Worker()
  432. handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
  433. handlers['SIGHUP']('SIGHUP', object())
  434. @skip.unless_module('multiprocessing')
  435. def test_worker_term_hard_handler_only_stop_MainProcess(self):
  436. process = current_process()
  437. name, process.name = process.name, 'OtherProcess'
  438. try:
  439. with patch('celery.apps.worker.active_thread_count') as c:
  440. c.return_value = 3
  441. worker = self._Worker()
  442. handlers = self.psig(
  443. cd.install_worker_term_hard_handler, worker)
  444. try:
  445. handlers['SIGQUIT']('SIGQUIT', object())
  446. assert state.should_terminate
  447. finally:
  448. state.should_terminate = None
  449. with patch('celery.apps.worker.active_thread_count') as c:
  450. c.return_value = 1
  451. worker = self._Worker()
  452. handlers = self.psig(
  453. cd.install_worker_term_hard_handler, worker)
  454. try:
  455. with pytest.raises(WorkerTerminate):
  456. handlers['SIGQUIT']('SIGQUIT', object())
  457. finally:
  458. state.should_terminate = None
  459. finally:
  460. process.name = name
  461. def test_worker_term_handler_when_threads(self):
  462. with patch('celery.apps.worker.active_thread_count') as c:
  463. c.return_value = 3
  464. worker = self._Worker()
  465. handlers = self.psig(cd.install_worker_term_handler, worker)
  466. try:
  467. handlers['SIGTERM']('SIGTERM', object())
  468. assert state.should_stop == EX_OK
  469. finally:
  470. state.should_stop = None
  471. def test_worker_term_handler_when_single_thread(self):
  472. with patch('celery.apps.worker.active_thread_count') as c:
  473. c.return_value = 1
  474. worker = self._Worker()
  475. handlers = self.psig(cd.install_worker_term_handler, worker)
  476. try:
  477. with pytest.raises(WorkerShutdown):
  478. handlers['SIGTERM']('SIGTERM', object())
  479. finally:
  480. state.should_stop = None
  481. @patch('sys.__stderr__')
  482. @skip.if_pypy()
  483. @skip.if_jython()
  484. def test_worker_cry_handler(self, stderr):
  485. handlers = self.psig(cd.install_cry_handler)
  486. assert handlers['SIGUSR1']('SIGUSR1', object()) is None
  487. stderr.write.assert_called()
  488. @skip.unless_module('multiprocessing')
  489. def test_worker_term_handler_only_stop_MainProcess(self):
  490. process = current_process()
  491. name, process.name = process.name, 'OtherProcess'
  492. try:
  493. with patch('celery.apps.worker.active_thread_count') as c:
  494. c.return_value = 3
  495. worker = self._Worker()
  496. handlers = self.psig(cd.install_worker_term_handler, worker)
  497. handlers['SIGTERM']('SIGTERM', object())
  498. assert state.should_stop == EX_OK
  499. with patch('celery.apps.worker.active_thread_count') as c:
  500. c.return_value = 1
  501. worker = self._Worker()
  502. handlers = self.psig(cd.install_worker_term_handler, worker)
  503. with pytest.raises(WorkerShutdown):
  504. handlers['SIGTERM']('SIGTERM', object())
  505. finally:
  506. process.name = name
  507. state.should_stop = None
  508. @skip.unless_symbol('os.execv')
  509. @patch('celery.platforms.close_open_fds')
  510. @patch('atexit.register')
  511. @patch('os.close')
  512. def test_worker_restart_handler(self, _close, register, close_open):
  513. argv = []
  514. def _execv(*args):
  515. argv.extend(args)
  516. execv, os.execv = os.execv, _execv
  517. try:
  518. worker = self._Worker()
  519. handlers = self.psig(cd.install_worker_restart_handler, worker)
  520. handlers['SIGHUP']('SIGHUP', object())
  521. assert state.should_stop == EX_OK
  522. register.assert_called()
  523. callback = register.call_args[0][0]
  524. callback()
  525. assert argv
  526. finally:
  527. os.execv = execv
  528. state.should_stop = None
  529. def test_worker_term_hard_handler_when_threaded(self):
  530. with patch('celery.apps.worker.active_thread_count') as c:
  531. c.return_value = 3
  532. worker = self._Worker()
  533. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  534. try:
  535. handlers['SIGQUIT']('SIGQUIT', object())
  536. assert state.should_terminate
  537. finally:
  538. state.should_terminate = None
  539. def test_worker_term_hard_handler_when_single_threaded(self):
  540. with patch('celery.apps.worker.active_thread_count') as c:
  541. c.return_value = 1
  542. worker = self._Worker()
  543. handlers = self.psig(cd.install_worker_term_hard_handler, worker)
  544. with pytest.raises(WorkerTerminate):
  545. handlers['SIGQUIT']('SIGQUIT', object())