| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657 | from __future__ import absolute_import, unicode_literalsimport loggingimport osimport sysimport pytestfrom billiard.process import current_processfrom case import Mock, mock, patch, skipfrom kombu import Exchange, Queuefrom celery import platforms, signalsfrom celery.app import tracefrom celery.apps import worker as cdfrom celery.bin.worker import main as worker_mainfrom celery.bin.worker import workerfrom celery.exceptions import (ImproperlyConfigured, WorkerShutdown,                               WorkerTerminate)from celery.platforms import EX_FAILURE, EX_OKfrom celery.worker import state@pytest.fixture(autouse=True)def reset_worker_optimizations():    yield    trace.reset_worker_optimizations()class Worker(cd.Worker):    redirect_stdouts = False    def start(self, *args, **kwargs):        self.on_start()class test_Worker:    Worker = Worker    def test_queues_string(self):        with mock.stdouts():            w = self.app.Worker()            w.setup_queues('foo,bar,baz')            assert 'foo' in self.app.amqp.queues    def test_cpu_count(self):        with mock.stdouts():            with patch('celery.worker.worker.cpu_count') as cpu_count:                cpu_count.side_effect = NotImplementedError()                w = self.app.Worker(concurrency=None)                assert w.concurrency == 2            w = self.app.Worker(concurrency=5)            assert w.concurrency == 5    def test_windows_B_option(self):        with mock.stdouts():            self.app.IS_WINDOWS = True            with pytest.raises(SystemExit):                worker(app=self.app).run(beat=True)    def test_setup_concurrency_very_early(self):        x = worker()        x.run = Mock()        with pytest.raises(ImportError):            x.execute_from_commandline(['worker', '-P', 'xyzybox'])    def test_run_from_argv_basic(self):        x = worker(app=self.app)        x.run = Mock()        x.maybe_detach = Mock()        def run(*args, **kwargs):            pass        x.run = run        x.run_from_argv('celery', [])        x.maybe_detach.assert_called()    def test_maybe_detach(self):        x = worker(app=self.app)        with patch('celery.bin.worker.detached_celeryd') as detached:            x.maybe_detach([])            detached.assert_not_called()            with pytest.raises(SystemExit):                x.maybe_detach(['--detach'])            detached.assert_called()    def test_invalid_loglevel_gives_error(self):        with mock.stdouts():            x = worker(app=self.app)            with pytest.raises(SystemExit):                x.run(loglevel='GRIM_REAPER')    def test_no_loglevel(self):        self.app.Worker = Mock()        worker(app=self.app).run(loglevel=None)    def test_tasklist(self):        worker = self.app.Worker()        assert worker.app.tasks        assert worker.app.finalized        assert worker.tasklist(include_builtins=True)        worker.tasklist(include_builtins=False)    def test_extra_info(self):        worker = self.app.Worker()        worker.loglevel = logging.WARNING        assert not worker.extra_info()        worker.loglevel = logging.INFO        assert worker.extra_info()    def test_loglevel_string(self):        with mock.stdouts():            worker = self.Worker(app=self.app, loglevel='INFO')            assert worker.loglevel == logging.INFO    def test_run_worker(self, patching):        handlers = {}        class Signals(platforms.Signals):            def __setitem__(self, sig, handler):                handlers[sig] = handler        patching.setattr('celery.platforms.signals', Signals())        with mock.stdouts():            w = self.Worker(app=self.app)            w._isatty = False            w.on_start()            for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':                assert sig in handlers            handlers.clear()            w = self.Worker(app=self.app)            w._isatty = True            w.on_start()            for sig in 'SIGINT', 'SIGTERM':                assert sig in handlers            assert 'SIGHUP' not in handlers    def test_startup_info(self):        with mock.stdouts():            worker = self.Worker(app=self.app)            worker.on_start()            assert worker.startup_info()            worker.loglevel = logging.DEBUG            assert worker.startup_info()            worker.loglevel = logging.INFO            assert worker.startup_info()            worker.autoscale = 13, 10            assert worker.startup_info()            prev_loader = self.app.loader            worker = self.Worker(                app=self.app,                queues='foo,bar,baz,xuzzy,do,re,mi',            )            with patch('celery.apps.worker.qualname') as qualname:                qualname.return_value = 'acme.backed_beans.Loader'                assert worker.startup_info()            with patch('celery.apps.worker.qualname') as qualname:                qualname.return_value = 'celery.loaders.Loader'                assert worker.startup_info()            from celery.loaders.app import AppLoader            self.app.loader = AppLoader(app=self.app)            assert worker.startup_info()            self.app.loader = prev_loader            worker.task_events = True            assert worker.startup_info()            # test when there are too few output lines            # to draft the ascii art onto            prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']            try:                assert worker.startup_info()            finally:                cd.ARTLINES = prev    def test_run(self):        with mock.stdouts():            self.Worker(app=self.app).on_start()            self.Worker(app=self.app, purge=True).on_start()            worker = self.Worker(app=self.app)            worker.on_start()    def test_purge_messages(self):        with mock.stdouts():            self.Worker(app=self.app).purge_messages()    def test_init_queues(self):        with mock.stdouts():            app = self.app            c = app.conf            app.amqp.queues = app.amqp.Queues({                'celery': {                    'exchange': 'celery',                    'routing_key': 'celery',                },                'video': {                    'exchange': 'video',                    'routing_key': 'video',                },            })            worker = self.Worker(app=self.app)            worker.setup_queues(['video'])            assert 'video' in app.amqp.queues            assert 'video' in app.amqp.queues.consume_from            assert 'celery' in app.amqp.queues            assert 'celery' not in app.amqp.queues.consume_from            c.task_create_missing_queues = False            del(app.amqp.queues)            with pytest.raises(ImproperlyConfigured):                self.Worker(app=self.app).setup_queues(['image'])            del(app.amqp.queues)            c.task_create_missing_queues = True            worker = self.Worker(app=self.app)            worker.setup_queues(['image'])            assert 'image' in app.amqp.queues.consume_from            assert app.amqp.queues['image'] == Queue(                'image', Exchange('image'),                routing_key='image',            )    def test_autoscale_argument(self):        with mock.stdouts():            worker1 = self.Worker(app=self.app, autoscale='10,3')            assert worker1.autoscale == [10, 3]            worker2 = self.Worker(app=self.app, autoscale='10')            assert worker2.autoscale == [10, 0]    def test_include_argument(self):        worker1 = self.Worker(app=self.app, include='os')        assert worker1.include == ['os']        worker2 = self.Worker(app=self.app,                              include='os,sys')        assert worker2.include == ['os', 'sys']        self.Worker(app=self.app, include=['os', 'sys'])    def test_unknown_loglevel(self):        with mock.stdouts():            with pytest.raises(SystemExit):                worker(app=self.app).run(loglevel='ALIEN')            worker1 = self.Worker(app=self.app, loglevel=0xFFFF)            assert worker1.loglevel == 0xFFFF    @patch('os._exit')    @skip.if_win32()    def test_warns_if_running_as_privileged_user(self, _exit, patching):        getuid = patching('os.getuid')        with mock.stdouts() as (_, stderr):            getuid.return_value = 0            self.app.conf.accept_content = ['pickle']            worker = self.Worker(app=self.app)            worker.on_start()            _exit.assert_called_with(1)            patching.setattr('celery.platforms.C_FORCE_ROOT', True)            worker = self.Worker(app=self.app)            worker.on_start()            assert 'a very bad idea' in stderr.getvalue()            patching.setattr('celery.platforms.C_FORCE_ROOT', False)            self.app.conf.accept_content = ['json']            worker = self.Worker(app=self.app)            worker.on_start()            assert 'superuser' in stderr.getvalue()    def test_redirect_stdouts(self):        with mock.stdouts():            self.Worker(app=self.app, redirect_stdouts=False)            with pytest.raises(AttributeError):                sys.stdout.logger    def test_on_start_custom_logging(self):        with mock.stdouts():            self.app.log.redirect_stdouts = Mock()            worker = self.Worker(app=self.app, redirect_stoutds=True)            worker._custom_logging = True            worker.on_start()            self.app.log.redirect_stdouts.assert_not_called()    def test_setup_logging_no_color(self):        worker = self.Worker(            app=self.app, redirect_stdouts=False, no_color=True,        )        prev, self.app.log.setup = self.app.log.setup, Mock()        try:            worker.setup_logging()            assert not self.app.log.setup.call_args[1]['colorize']        finally:            self.app.log.setup = prev    def test_startup_info_pool_is_str(self):        with mock.stdouts():            worker = self.Worker(app=self.app, redirect_stdouts=False)            worker.pool_cls = 'foo'            worker.startup_info()    def test_redirect_stdouts_already_handled(self):        logging_setup = [False]        @signals.setup_logging.connect        def on_logging_setup(**kwargs):            logging_setup[0] = True        try:            worker = self.Worker(app=self.app, redirect_stdouts=False)            worker.app.log.already_setup = False            worker.setup_logging()            assert logging_setup[0]            with pytest.raises(AttributeError):                sys.stdout.logger        finally:            signals.setup_logging.disconnect(on_logging_setup)    def test_platform_tweaks_macOS(self):        class macOSWorker(Worker):            proxy_workaround_installed = False            def macOS_proxy_detection_workaround(self):                self.proxy_workaround_installed = True        with mock.stdouts():            worker = macOSWorker(app=self.app, redirect_stdouts=False)            def install_HUP_nosupport(controller):                controller.hup_not_supported_installed = True            class Controller(object):                pass            prev = cd.install_HUP_not_supported_handler            cd.install_HUP_not_supported_handler = install_HUP_nosupport            try:                worker.app.IS_macOS = True                controller = Controller()                worker.install_platform_tweaks(controller)                assert controller.hup_not_supported_installed                assert worker.proxy_workaround_installed            finally:                cd.install_HUP_not_supported_handler = prev    def test_general_platform_tweaks(self):        restart_worker_handler_installed = [False]        def install_worker_restart_handler(worker):            restart_worker_handler_installed[0] = True        class Controller(object):            pass        with mock.stdouts():            prev = cd.install_worker_restart_handler            cd.install_worker_restart_handler = install_worker_restart_handler            try:                worker = self.Worker(app=self.app)                worker.app.IS_macOS = False                worker.install_platform_tweaks(Controller())                assert restart_worker_handler_installed[0]            finally:                cd.install_worker_restart_handler = prev    def test_on_consumer_ready(self):        worker_ready_sent = [False]        @signals.worker_ready.connect        def on_worker_ready(**kwargs):            worker_ready_sent[0] = True        with mock.stdouts():            self.Worker(app=self.app).on_consumer_ready(object())            assert worker_ready_sent[0]@mock.stdoutsclass test_funs:    def test_active_thread_count(self):        assert cd.active_thread_count()    @skip.unless_module('setproctitle')    def test_set_process_status(self):        worker = Worker(app=self.app, hostname='xyzza')        prev1, sys.argv = sys.argv, ['Arg0']        try:            st = worker.set_process_status('Running')            assert 'celeryd' in st            assert 'xyzza' in st            assert 'Running' in st            prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']            try:                st = worker.set_process_status('Running')                assert 'celeryd' in st                assert 'xyzza' in st                assert 'Running' in st                assert 'Arg1' in st            finally:                sys.argv = prev2        finally:            sys.argv = prev1    def test_parse_options(self):        cmd = worker()        cmd.app = self.app        opts, args = cmd.parse_options('worker', ['--concurrency=512',                                                  '--heartbeat-interval=10'])        assert opts['concurrency'] == 512        assert opts['heartbeat_interval'] == 10    def test_main(self):        p, cd.Worker = cd.Worker, Worker        s, sys.argv = sys.argv, ['worker', '--discard']        try:            worker_main(app=self.app)        finally:            cd.Worker = p            sys.argv = s@mock.stdoutsclass test_signal_handlers:    class _Worker(object):        hostname = 'foo'        stopped = False        terminated = False        def stop(self, in_sighandler=False):            self.stopped = True        def terminate(self, in_sighandler=False):            self.terminated = True    def psig(self, fun, *args, **kwargs):        handlers = {}        class Signals(platforms.Signals):            def __setitem__(self, sig, handler):                handlers[sig] = handler        p, platforms.signals = platforms.signals, Signals()        try:            fun(*args, **kwargs)            return handlers        finally:            platforms.signals = p    def test_worker_int_handler(self):        worker = self._Worker()        handlers = self.psig(cd.install_worker_int_handler, worker)        next_handlers = {}        state.should_stop = None        state.should_terminate = None        class Signals(platforms.Signals):            def __setitem__(self, sig, handler):                next_handlers[sig] = handler        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            p, platforms.signals = platforms.signals, Signals()            try:                handlers['SIGINT']('SIGINT', object())                assert state.should_stop                assert state.should_stop == EX_FAILURE            finally:                platforms.signals = p                state.should_stop = None            try:                next_handlers['SIGINT']('SIGINT', object())                assert state.should_terminate                assert state.should_terminate == EX_FAILURE            finally:                state.should_terminate = None        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            p, platforms.signals = platforms.signals, Signals()            try:                with pytest.raises(WorkerShutdown):                    handlers['SIGINT']('SIGINT', object())            finally:                platforms.signals = p            with pytest.raises(WorkerTerminate):                next_handlers['SIGINT']('SIGINT', object())    @skip.unless_module('multiprocessing')    def test_worker_int_handler_only_stop_MainProcess(self):        process = current_process()        name, process.name = process.name, 'OtherProcess'        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            try:                worker = self._Worker()                handlers = self.psig(cd.install_worker_int_handler, worker)                handlers['SIGINT']('SIGINT', object())                assert state.should_stop            finally:                process.name = name                state.should_stop = None        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            try:                worker = self._Worker()                handlers = self.psig(cd.install_worker_int_handler, worker)                with pytest.raises(WorkerShutdown):                    handlers['SIGINT']('SIGINT', object())            finally:                process.name = name                state.should_stop = None    def test_install_HUP_not_supported_handler(self):        worker = self._Worker()        handlers = self.psig(cd.install_HUP_not_supported_handler, worker)        handlers['SIGHUP']('SIGHUP', object())    @skip.unless_module('multiprocessing')    def test_worker_term_hard_handler_only_stop_MainProcess(self):        process = current_process()        name, process.name = process.name, 'OtherProcess'        try:            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 3                worker = self._Worker()                handlers = self.psig(                    cd.install_worker_term_hard_handler, worker)                try:                    handlers['SIGQUIT']('SIGQUIT', object())                    assert state.should_terminate                finally:                    state.should_terminate = None            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 1                worker = self._Worker()                handlers = self.psig(                    cd.install_worker_term_hard_handler, worker)                try:                    with pytest.raises(WorkerTerminate):                        handlers['SIGQUIT']('SIGQUIT', object())                finally:                    state.should_terminate = None        finally:            process.name = name    def test_worker_term_handler_when_threads(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_handler, worker)            try:                handlers['SIGTERM']('SIGTERM', object())                assert state.should_stop == EX_OK            finally:                state.should_stop = None    def test_worker_term_handler_when_single_thread(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_handler, worker)            try:                with pytest.raises(WorkerShutdown):                    handlers['SIGTERM']('SIGTERM', object())            finally:                state.should_stop = None    @patch('sys.__stderr__')    @skip.if_pypy()    @skip.if_jython()    def test_worker_cry_handler(self, stderr):        handlers = self.psig(cd.install_cry_handler)        assert handlers['SIGUSR1']('SIGUSR1', object()) is None        stderr.write.assert_called()    @skip.unless_module('multiprocessing')    def test_worker_term_handler_only_stop_MainProcess(self):        process = current_process()        name, process.name = process.name, 'OtherProcess'        try:            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 3                worker = self._Worker()                handlers = self.psig(cd.install_worker_term_handler, worker)                handlers['SIGTERM']('SIGTERM', object())                assert state.should_stop == EX_OK            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 1                worker = self._Worker()                handlers = self.psig(cd.install_worker_term_handler, worker)                with pytest.raises(WorkerShutdown):                    handlers['SIGTERM']('SIGTERM', object())        finally:            process.name = name            state.should_stop = None    @skip.unless_symbol('os.execv')    @patch('celery.platforms.close_open_fds')    @patch('atexit.register')    @patch('os.close')    def test_worker_restart_handler(self, _close, register, close_open):        argv = []        def _execv(*args):            argv.extend(args)        execv, os.execv = os.execv, _execv        try:            worker = self._Worker()            handlers = self.psig(cd.install_worker_restart_handler, worker)            handlers['SIGHUP']('SIGHUP', object())            assert state.should_stop == EX_OK            register.assert_called()            callback = register.call_args[0][0]            callback()            assert argv        finally:            os.execv = execv            state.should_stop = None    def test_worker_term_hard_handler_when_threaded(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_hard_handler, worker)            try:                handlers['SIGQUIT']('SIGQUIT', object())                assert state.should_terminate            finally:                state.should_terminate = None    def test_worker_term_hard_handler_when_single_threaded(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_hard_handler, worker)            with pytest.raises(WorkerTerminate):                handlers['SIGQUIT']('SIGQUIT', object())    def test_send_worker_shutting_down_signal(self):        with patch('celery.apps.worker.signals.worker_shutting_down') as wsd:            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_handler, worker)            try:                with pytest.raises(WorkerShutdown):                    handlers['SIGTERM']('SIGTERM', object())            finally:                state.should_stop = None            wsd.send.assert_called_with(                sender='foo', sig='SIGTERM', how='Warm', exitcode=0,            )
 |