123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 |
- from __future__ import absolute_import, unicode_literals
- import logging
- import os
- import sys
- import pytest
- from billiard.process import current_process
- from case import Mock, mock, patch, skip
- from kombu import Exchange, Queue
- from celery import platforms, signals
- from celery.app import trace
- from celery.apps import worker as cd
- from celery.bin.worker import main as worker_main
- from celery.bin.worker import worker
- from celery.exceptions import (ImproperlyConfigured, WorkerShutdown,
- WorkerTerminate)
- from celery.platforms import EX_FAILURE, EX_OK
- from celery.worker import state
- @pytest.fixture(autouse=True)
- def reset_worker_optimizations():
- yield
- trace.reset_worker_optimizations()
- class Worker(cd.Worker):
- redirect_stdouts = False
- def start(self, *args, **kwargs):
- self.on_start()
- class test_Worker:
- Worker = Worker
- def test_queues_string(self):
- with mock.stdouts():
- w = self.app.Worker()
- w.setup_queues('foo,bar,baz')
- assert 'foo' in self.app.amqp.queues
- def test_cpu_count(self):
- with mock.stdouts():
- with patch('celery.worker.worker.cpu_count') as cpu_count:
- cpu_count.side_effect = NotImplementedError()
- w = self.app.Worker(concurrency=None)
- assert w.concurrency == 2
- w = self.app.Worker(concurrency=5)
- assert w.concurrency == 5
- def test_windows_B_option(self):
- with mock.stdouts():
- self.app.IS_WINDOWS = True
- with pytest.raises(SystemExit):
- worker(app=self.app).run(beat=True)
- def test_setup_concurrency_very_early(self):
- x = worker()
- x.run = Mock()
- with pytest.raises(ImportError):
- x.execute_from_commandline(['worker', '-P', 'xyzybox'])
- def test_run_from_argv_basic(self):
- x = worker(app=self.app)
- x.run = Mock()
- x.maybe_detach = Mock()
- def run(*args, **kwargs):
- pass
- x.run = run
- x.run_from_argv('celery', [])
- x.maybe_detach.assert_called()
- def test_maybe_detach(self):
- x = worker(app=self.app)
- with patch('celery.bin.worker.detached_celeryd') as detached:
- x.maybe_detach([])
- detached.assert_not_called()
- with pytest.raises(SystemExit):
- x.maybe_detach(['--detach'])
- detached.assert_called()
- def test_invalid_loglevel_gives_error(self):
- with mock.stdouts():
- x = worker(app=self.app)
- with pytest.raises(SystemExit):
- x.run(loglevel='GRIM_REAPER')
- def test_no_loglevel(self):
- self.app.Worker = Mock()
- worker(app=self.app).run(loglevel=None)
- def test_tasklist(self):
- worker = self.app.Worker()
- assert worker.app.tasks
- assert worker.app.finalized
- assert worker.tasklist(include_builtins=True)
- worker.tasklist(include_builtins=False)
- def test_extra_info(self):
- worker = self.app.Worker()
- worker.loglevel = logging.WARNING
- assert not worker.extra_info()
- worker.loglevel = logging.INFO
- assert worker.extra_info()
- def test_loglevel_string(self):
- with mock.stdouts():
- worker = self.Worker(app=self.app, loglevel='INFO')
- assert worker.loglevel == logging.INFO
- def test_run_worker(self, patching):
- handlers = {}
- class Signals(platforms.Signals):
- def __setitem__(self, sig, handler):
- handlers[sig] = handler
- patching.setattr('celery.platforms.signals', Signals())
- with mock.stdouts():
- w = self.Worker(app=self.app)
- w._isatty = False
- w.on_start()
- for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
- assert sig in handlers
- handlers.clear()
- w = self.Worker(app=self.app)
- w._isatty = True
- w.on_start()
- for sig in 'SIGINT', 'SIGTERM':
- assert sig in handlers
- assert 'SIGHUP' not in handlers
- def test_startup_info(self):
- with mock.stdouts():
- worker = self.Worker(app=self.app)
- worker.on_start()
- assert worker.startup_info()
- worker.loglevel = logging.DEBUG
- assert worker.startup_info()
- worker.loglevel = logging.INFO
- assert worker.startup_info()
- worker.autoscale = 13, 10
- assert worker.startup_info()
- prev_loader = self.app.loader
- worker = self.Worker(
- app=self.app,
- queues='foo,bar,baz,xuzzy,do,re,mi',
- )
- with patch('celery.apps.worker.qualname') as qualname:
- qualname.return_value = 'acme.backed_beans.Loader'
- assert worker.startup_info()
- with patch('celery.apps.worker.qualname') as qualname:
- qualname.return_value = 'celery.loaders.Loader'
- assert worker.startup_info()
- from celery.loaders.app import AppLoader
- self.app.loader = AppLoader(app=self.app)
- assert worker.startup_info()
- self.app.loader = prev_loader
- worker.task_events = True
- assert worker.startup_info()
- # test when there are too few output lines
- # to draft the ascii art onto
- prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
- try:
- assert worker.startup_info()
- finally:
- cd.ARTLINES = prev
- def test_run(self):
- with mock.stdouts():
- self.Worker(app=self.app).on_start()
- self.Worker(app=self.app, purge=True).on_start()
- worker = self.Worker(app=self.app)
- worker.on_start()
- def test_purge_messages(self):
- with mock.stdouts():
- self.Worker(app=self.app).purge_messages()
- def test_init_queues(self):
- with mock.stdouts():
- app = self.app
- c = app.conf
- app.amqp.queues = app.amqp.Queues({
- 'celery': {
- 'exchange': 'celery',
- 'routing_key': 'celery',
- },
- 'video': {
- 'exchange': 'video',
- 'routing_key': 'video',
- },
- })
- worker = self.Worker(app=self.app)
- worker.setup_queues(['video'])
- assert 'video' in app.amqp.queues
- assert 'video' in app.amqp.queues.consume_from
- assert 'celery' in app.amqp.queues
- assert 'celery' not in app.amqp.queues.consume_from
- c.task_create_missing_queues = False
- del(app.amqp.queues)
- with pytest.raises(ImproperlyConfigured):
- self.Worker(app=self.app).setup_queues(['image'])
- del(app.amqp.queues)
- c.task_create_missing_queues = True
- worker = self.Worker(app=self.app)
- worker.setup_queues(['image'])
- assert 'image' in app.amqp.queues.consume_from
- assert app.amqp.queues['image'] == Queue(
- 'image', Exchange('image'),
- routing_key='image',
- )
- def test_autoscale_argument(self):
- with mock.stdouts():
- worker1 = self.Worker(app=self.app, autoscale='10,3')
- assert worker1.autoscale == [10, 3]
- worker2 = self.Worker(app=self.app, autoscale='10')
- assert worker2.autoscale == [10, 0]
- def test_include_argument(self):
- worker1 = self.Worker(app=self.app, include='os')
- assert worker1.include == ['os']
- worker2 = self.Worker(app=self.app,
- include='os,sys')
- assert worker2.include == ['os', 'sys']
- self.Worker(app=self.app, include=['os', 'sys'])
- def test_unknown_loglevel(self):
- with mock.stdouts():
- with pytest.raises(SystemExit):
- worker(app=self.app).run(loglevel='ALIEN')
- worker1 = self.Worker(app=self.app, loglevel=0xFFFF)
- assert worker1.loglevel == 0xFFFF
- @patch('os._exit')
- @skip.if_win32()
- def test_warns_if_running_as_privileged_user(self, _exit, patching):
- getuid = patching('os.getuid')
- with mock.stdouts() as (_, stderr):
- getuid.return_value = 0
- self.app.conf.accept_content = ['pickle']
- worker = self.Worker(app=self.app)
- worker.on_start()
- _exit.assert_called_with(1)
- patching.setattr('celery.platforms.C_FORCE_ROOT', True)
- worker = self.Worker(app=self.app)
- worker.on_start()
- assert 'a very bad idea' in stderr.getvalue()
- patching.setattr('celery.platforms.C_FORCE_ROOT', False)
- self.app.conf.accept_content = ['json']
- worker = self.Worker(app=self.app)
- worker.on_start()
- assert 'superuser' in stderr.getvalue()
- def test_redirect_stdouts(self):
- with mock.stdouts():
- self.Worker(app=self.app, redirect_stdouts=False)
- with pytest.raises(AttributeError):
- sys.stdout.logger
- def test_on_start_custom_logging(self):
- with mock.stdouts():
- self.app.log.redirect_stdouts = Mock()
- worker = self.Worker(app=self.app, redirect_stoutds=True)
- worker._custom_logging = True
- worker.on_start()
- self.app.log.redirect_stdouts.assert_not_called()
- def test_setup_logging_no_color(self):
- worker = self.Worker(
- app=self.app, redirect_stdouts=False, no_color=True,
- )
- prev, self.app.log.setup = self.app.log.setup, Mock()
- try:
- worker.setup_logging()
- assert not self.app.log.setup.call_args[1]['colorize']
- finally:
- self.app.log.setup = prev
- def test_startup_info_pool_is_str(self):
- with mock.stdouts():
- worker = self.Worker(app=self.app, redirect_stdouts=False)
- worker.pool_cls = 'foo'
- worker.startup_info()
- def test_redirect_stdouts_already_handled(self):
- logging_setup = [False]
- @signals.setup_logging.connect
- def on_logging_setup(**kwargs):
- logging_setup[0] = True
- try:
- worker = self.Worker(app=self.app, redirect_stdouts=False)
- worker.app.log.already_setup = False
- worker.setup_logging()
- assert logging_setup[0]
- with pytest.raises(AttributeError):
- sys.stdout.logger
- finally:
- signals.setup_logging.disconnect(on_logging_setup)
- def test_platform_tweaks_macOS(self):
- class macOSWorker(Worker):
- proxy_workaround_installed = False
- def macOS_proxy_detection_workaround(self):
- self.proxy_workaround_installed = True
- with mock.stdouts():
- worker = macOSWorker(app=self.app, redirect_stdouts=False)
- def install_HUP_nosupport(controller):
- controller.hup_not_supported_installed = True
- class Controller(object):
- pass
- prev = cd.install_HUP_not_supported_handler
- cd.install_HUP_not_supported_handler = install_HUP_nosupport
- try:
- worker.app.IS_macOS = True
- controller = Controller()
- worker.install_platform_tweaks(controller)
- assert controller.hup_not_supported_installed
- assert worker.proxy_workaround_installed
- finally:
- cd.install_HUP_not_supported_handler = prev
- def test_general_platform_tweaks(self):
- restart_worker_handler_installed = [False]
- def install_worker_restart_handler(worker):
- restart_worker_handler_installed[0] = True
- class Controller(object):
- pass
- with mock.stdouts():
- prev = cd.install_worker_restart_handler
- cd.install_worker_restart_handler = install_worker_restart_handler
- try:
- worker = self.Worker(app=self.app)
- worker.app.IS_macOS = False
- worker.install_platform_tweaks(Controller())
- assert restart_worker_handler_installed[0]
- finally:
- cd.install_worker_restart_handler = prev
- def test_on_consumer_ready(self):
- worker_ready_sent = [False]
- @signals.worker_ready.connect
- def on_worker_ready(**kwargs):
- worker_ready_sent[0] = True
- with mock.stdouts():
- self.Worker(app=self.app).on_consumer_ready(object())
- assert worker_ready_sent[0]
- @mock.stdouts
- class test_funs:
- def test_active_thread_count(self):
- assert cd.active_thread_count()
- @skip.unless_module('setproctitle')
- def test_set_process_status(self):
- worker = Worker(app=self.app, hostname='xyzza')
- prev1, sys.argv = sys.argv, ['Arg0']
- try:
- st = worker.set_process_status('Running')
- assert 'celeryd' in st
- assert 'xyzza' in st
- assert 'Running' in st
- prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
- try:
- st = worker.set_process_status('Running')
- assert 'celeryd' in st
- assert 'xyzza' in st
- assert 'Running' in st
- assert 'Arg1' in st
- finally:
- sys.argv = prev2
- finally:
- sys.argv = prev1
- def test_parse_options(self):
- cmd = worker()
- cmd.app = self.app
- opts, args = cmd.parse_options('worker', ['--concurrency=512',
- '--heartbeat-interval=10'])
- assert opts['concurrency'] == 512
- assert opts['heartbeat_interval'] == 10
- def test_main(self):
- p, cd.Worker = cd.Worker, Worker
- s, sys.argv = sys.argv, ['worker', '--discard']
- try:
- worker_main(app=self.app)
- finally:
- cd.Worker = p
- sys.argv = s
- @mock.stdouts
- class test_signal_handlers:
- class _Worker(object):
- hostname = 'foo'
- stopped = False
- terminated = False
- def stop(self, in_sighandler=False):
- self.stopped = True
- def terminate(self, in_sighandler=False):
- self.terminated = True
- def psig(self, fun, *args, **kwargs):
- handlers = {}
- class Signals(platforms.Signals):
- def __setitem__(self, sig, handler):
- handlers[sig] = handler
- p, platforms.signals = platforms.signals, Signals()
- try:
- fun(*args, **kwargs)
- return handlers
- finally:
- platforms.signals = p
- def test_worker_int_handler(self):
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_int_handler, worker)
- next_handlers = {}
- state.should_stop = None
- state.should_terminate = None
- class Signals(platforms.Signals):
- def __setitem__(self, sig, handler):
- next_handlers[sig] = handler
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 3
- p, platforms.signals = platforms.signals, Signals()
- try:
- handlers['SIGINT']('SIGINT', object())
- assert state.should_stop
- assert state.should_stop == EX_FAILURE
- finally:
- platforms.signals = p
- state.should_stop = None
- try:
- next_handlers['SIGINT']('SIGINT', object())
- assert state.should_terminate
- assert state.should_terminate == EX_FAILURE
- finally:
- state.should_terminate = None
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 1
- p, platforms.signals = platforms.signals, Signals()
- try:
- with pytest.raises(WorkerShutdown):
- handlers['SIGINT']('SIGINT', object())
- finally:
- platforms.signals = p
- with pytest.raises(WorkerTerminate):
- next_handlers['SIGINT']('SIGINT', object())
- @skip.unless_module('multiprocessing')
- def test_worker_int_handler_only_stop_MainProcess(self):
- process = current_process()
- name, process.name = process.name, 'OtherProcess'
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 3
- try:
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_int_handler, worker)
- handlers['SIGINT']('SIGINT', object())
- assert state.should_stop
- finally:
- process.name = name
- state.should_stop = None
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 1
- try:
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_int_handler, worker)
- with pytest.raises(WorkerShutdown):
- handlers['SIGINT']('SIGINT', object())
- finally:
- process.name = name
- state.should_stop = None
- def test_install_HUP_not_supported_handler(self):
- worker = self._Worker()
- handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
- handlers['SIGHUP']('SIGHUP', object())
- @skip.unless_module('multiprocessing')
- def test_worker_term_hard_handler_only_stop_MainProcess(self):
- process = current_process()
- name, process.name = process.name, 'OtherProcess'
- try:
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 3
- worker = self._Worker()
- handlers = self.psig(
- cd.install_worker_term_hard_handler, worker)
- try:
- handlers['SIGQUIT']('SIGQUIT', object())
- assert state.should_terminate
- finally:
- state.should_terminate = None
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 1
- worker = self._Worker()
- handlers = self.psig(
- cd.install_worker_term_hard_handler, worker)
- try:
- with pytest.raises(WorkerTerminate):
- handlers['SIGQUIT']('SIGQUIT', object())
- finally:
- state.should_terminate = None
- finally:
- process.name = name
- def test_worker_term_handler_when_threads(self):
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 3
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_term_handler, worker)
- try:
- handlers['SIGTERM']('SIGTERM', object())
- assert state.should_stop == EX_OK
- finally:
- state.should_stop = None
- def test_worker_term_handler_when_single_thread(self):
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 1
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_term_handler, worker)
- try:
- with pytest.raises(WorkerShutdown):
- handlers['SIGTERM']('SIGTERM', object())
- finally:
- state.should_stop = None
- @patch('sys.__stderr__')
- @skip.if_pypy()
- @skip.if_jython()
- def test_worker_cry_handler(self, stderr):
- handlers = self.psig(cd.install_cry_handler)
- assert handlers['SIGUSR1']('SIGUSR1', object()) is None
- stderr.write.assert_called()
- @skip.unless_module('multiprocessing')
- def test_worker_term_handler_only_stop_MainProcess(self):
- process = current_process()
- name, process.name = process.name, 'OtherProcess'
- try:
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 3
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_term_handler, worker)
- handlers['SIGTERM']('SIGTERM', object())
- assert state.should_stop == EX_OK
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 1
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_term_handler, worker)
- with pytest.raises(WorkerShutdown):
- handlers['SIGTERM']('SIGTERM', object())
- finally:
- process.name = name
- state.should_stop = None
- @skip.unless_symbol('os.execv')
- @patch('celery.platforms.close_open_fds')
- @patch('atexit.register')
- @patch('os.close')
- def test_worker_restart_handler(self, _close, register, close_open):
- argv = []
- def _execv(*args):
- argv.extend(args)
- execv, os.execv = os.execv, _execv
- try:
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_restart_handler, worker)
- handlers['SIGHUP']('SIGHUP', object())
- assert state.should_stop == EX_OK
- register.assert_called()
- callback = register.call_args[0][0]
- callback()
- assert argv
- finally:
- os.execv = execv
- state.should_stop = None
- def test_worker_term_hard_handler_when_threaded(self):
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 3
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_term_hard_handler, worker)
- try:
- handlers['SIGQUIT']('SIGQUIT', object())
- assert state.should_terminate
- finally:
- state.should_terminate = None
- def test_worker_term_hard_handler_when_single_threaded(self):
- with patch('celery.apps.worker.active_thread_count') as c:
- c.return_value = 1
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_term_hard_handler, worker)
- with pytest.raises(WorkerTerminate):
- handlers['SIGQUIT']('SIGQUIT', object())
- def test_send_worker_shutting_down_signal(self):
- with patch('celery.apps.worker.signals.worker_shutting_down') as wsd:
- worker = self._Worker()
- handlers = self.psig(cd.install_worker_term_handler, worker)
- try:
- with pytest.raises(WorkerShutdown):
- handlers['SIGTERM']('SIGTERM', object())
- finally:
- state.should_stop = None
- wsd.send.assert_called_with(
- sender='foo', sig='SIGTERM', how='Warm', exitcode=0,
- )
|