|| from __future__ import absolute_import, unicode_literalsimport loggingimport osimport pytestimport sysfrom billiard.process import current_processfrom case import Mock, mock, patch, skipfrom kombu import Exchange, Queuefrom celery import platformsfrom celery import signalsfrom celery.app import tracefrom celery.apps import worker as cdfrom celery.bin.worker import worker, main as worker_mainfrom celery.exceptions import (    ImproperlyConfigured, WorkerShutdown, WorkerTerminate,)from celery.platforms import EX_FAILURE, EX_OKfrom celery.worker import state@pytest.fixture(autouse=True)def reset_worker_optimizations():    yield    trace.reset_worker_optimizations()class Worker(cd.Worker):    redirect_stdouts = False    def start(self, *args, **kwargs):        self.on_start()class test_Worker:    Worker = Worker    def test_queues_string(self):        with mock.stdouts():            w = self.app.Worker()            w.setup_queues('foo,bar,baz')            assert 'foo' in self.app.amqp.queues    def test_cpu_count(self):        with mock.stdouts():            with patch('celery.worker.worker.cpu_count') as cpu_count:                cpu_count.side_effect = NotImplementedError()                w = self.app.Worker(concurrency=None)                assert w.concurrency == 2            w = self.app.Worker(concurrency=5)            assert w.concurrency == 5    def test_windows_B_option(self):        with mock.stdouts():            self.app.IS_WINDOWS = True            with pytest.raises(SystemExit):                worker(app=self.app).run(beat=True)    def test_setup_concurrency_very_early(self):        x = worker()        x.run = Mock()        with pytest.raises(ImportError):            x.execute_from_commandline(['worker', '-P', 'xyzybox'])    def test_run_from_argv_basic(self):        x = worker(app=self.app)        x.run = Mock()        x.maybe_detach = Mock()        def run(*args, **kwargs):            pass        x.run = run        x.run_from_argv('celery', [])        x.maybe_detach.assert_called()    def test_maybe_detach(self):        x = worker(app=self.app)        with patch('celery.bin.worker.detached_celeryd') as detached:            x.maybe_detach([])            detached.assert_not_called()            with pytest.raises(SystemExit):                x.maybe_detach(['--detach'])            detached.assert_called()    def test_invalid_loglevel_gives_error(self):        with mock.stdouts():            x = worker(app=self.app)            with pytest.raises(SystemExit):                x.run(loglevel='GRIM_REAPER')    def test_no_loglevel(self):        self.app.Worker = Mock()        worker(app=self.app).run(loglevel=None)    def test_tasklist(self):        worker = self.app.Worker()        assert worker.app.tasks        assert worker.app.finalized        assert worker.tasklist(include_builtins=True)        worker.tasklist(include_builtins=False)    def test_extra_info(self):        worker = self.app.Worker()        worker.loglevel = logging.WARNING        assert not worker.extra_info()        worker.loglevel = logging.INFO        assert worker.extra_info()    def test_loglevel_string(self):        with mock.stdouts():            worker = self.Worker(app=self.app, loglevel='INFO')            assert worker.loglevel == logging.INFO    def test_run_worker(self, patching):        handlers = {}        class Signals(platforms.Signals):            def __setitem__(self, sig, handler):                handlers[sig] = handler        patching.setattr('celery.platforms.signals', Signals())        with mock.stdouts():            w = self.Worker(app=self.app)            w._isatty = False            w.on_start()            for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':                assert sig in handlers            handlers.clear()            w = self.Worker(app=self.app)            w._isatty = True            w.on_start()            for sig in 'SIGINT', 'SIGTERM':                assert sig in handlers            assert 'SIGHUP' not in handlers    def test_startup_info(self):        with mock.stdouts():            worker = self.Worker(app=self.app)            worker.on_start()            assert worker.startup_info()            worker.loglevel = logging.DEBUG            assert worker.startup_info()            worker.loglevel = logging.INFO            assert worker.startup_info()            worker.autoscale = 13, 10            assert worker.startup_info()            prev_loader = self.app.loader            worker = self.Worker(                app=self.app,                queues='foo,bar,baz,xuzzy,do,re,mi',            )            with patch('celery.apps.worker.qualname') as qualname:                qualname.return_value = 'acme.backed_beans.Loader'                assert worker.startup_info()            with patch('celery.apps.worker.qualname') as qualname:                qualname.return_value = 'celery.loaders.Loader'                assert worker.startup_info()            from celery.loaders.app import AppLoader            self.app.loader = AppLoader(app=self.app)            assert worker.startup_info()            self.app.loader = prev_loader            worker.task_events = True            assert worker.startup_info()            # test when there are too few output lines            # to draft the ascii art onto            prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']            try:                assert worker.startup_info()            finally:                cd.ARTLINES = prev    def test_run(self):        with mock.stdouts():            self.Worker(app=self.app).on_start()            self.Worker(app=self.app, purge=True).on_start()            worker = self.Worker(app=self.app)            worker.on_start()    def test_purge_messages(self):        with mock.stdouts():            self.Worker(app=self.app).purge_messages()    def test_init_queues(self):        with mock.stdouts():            app = self.app            c = app.conf            app.amqp.queues = app.amqp.Queues({                'celery': {                    'exchange': 'celery',                    'routing_key': 'celery',                },                'video': {                    'exchange': 'video',                    'routing_key': 'video',                },            })            worker = self.Worker(app=self.app)            worker.setup_queues(['video'])            assert 'video' in app.amqp.queues            assert 'video' in app.amqp.queues.consume_from            assert 'celery' in app.amqp.queues            assert 'celery' not in app.amqp.queues.consume_from            c.task_create_missing_queues = False            del(app.amqp.queues)            with pytest.raises(ImproperlyConfigured):                self.Worker(app=self.app).setup_queues(['image'])            del(app.amqp.queues)            c.task_create_missing_queues = True            worker = self.Worker(app=self.app)            worker.setup_queues(['image'])            assert 'image' in app.amqp.queues.consume_from            assert app.amqp.queues['image'] == Queue(                'image', Exchange('image'),                routing_key='image',            )    def test_autoscale_argument(self):        with mock.stdouts():            worker1 = self.Worker(app=self.app, autoscale='10,3')            assert worker1.autoscale == [10, 3]            worker2 = self.Worker(app=self.app, autoscale='10')            assert worker2.autoscale == [10, 0]    def test_include_argument(self):        worker1 = self.Worker(app=self.app, include='os')        assert worker1.include == ['os']        worker2 = self.Worker(app=self.app,                              include='os,sys')        assert worker2.include == ['os', 'sys']        self.Worker(app=self.app, include=['os', 'sys'])    def test_unknown_loglevel(self):        with mock.stdouts():            with pytest.raises(SystemExit):                worker(app=self.app).run(loglevel='ALIEN')            worker1 = self.Worker(app=self.app, loglevel=0xFFFF)            assert worker1.loglevel == 0xFFFF    @patch('os._exit')    @skip.if_win32()    def test_warns_if_running_as_privileged_user(self, _exit, patching):        getuid = patching('os.getuid')        with mock.stdouts() as (_, stderr):            getuid.return_value = 0            self.app.conf.accept_content = ['pickle']            worker = self.Worker(app=self.app)            worker.on_start()            _exit.assert_called_with(1)            patching.setattr('celery.platforms.C_FORCE_ROOT', True)            worker = self.Worker(app=self.app)            worker.on_start()            assert 'a very bad idea' in stderr.getvalue()            patching.setattr('celery.platforms.C_FORCE_ROOT', False)            self.app.conf.accept_content = ['json']            worker = self.Worker(app=self.app)            worker.on_start()            assert 'superuser' in stderr.getvalue()    def test_redirect_stdouts(self):        with mock.stdouts():            self.Worker(app=self.app, redirect_stdouts=False)            with pytest.raises(AttributeError):                sys.stdout.logger    def test_on_start_custom_logging(self):        with mock.stdouts():            self.app.log.redirect_stdouts = Mock()            worker = self.Worker(app=self.app, redirect_stoutds=True)            worker._custom_logging = True            worker.on_start()            self.app.log.redirect_stdouts.assert_not_called()    def test_setup_logging_no_color(self):        worker = self.Worker(            app=self.app, redirect_stdouts=False, no_color=True,        )        prev, self.app.log.setup = self.app.log.setup, Mock()        try:            worker.setup_logging()            assert not self.app.log.setup.call_args[1]['colorize']        finally:            self.app.log.setup = prev    def test_startup_info_pool_is_str(self):        with mock.stdouts():            worker = self.Worker(app=self.app, redirect_stdouts=False)            worker.pool_cls = 'foo'            worker.startup_info()    def test_redirect_stdouts_already_handled(self):        logging_setup = [False]        @signals.setup_logging.connect        def on_logging_setup(**kwargs):            logging_setup[0] = True        try:            worker = self.Worker(app=self.app, redirect_stdouts=False)            worker.app.log.already_setup = False            worker.setup_logging()            assert logging_setup[0]            with pytest.raises(AttributeError):                sys.stdout.logger        finally:            signals.setup_logging.disconnect(on_logging_setup)    def test_platform_tweaks_macOS(self):        class macOSWorker(Worker):            proxy_workaround_installed = False            def macOS_proxy_detection_workaround(self):                self.proxy_workaround_installed = True        with mock.stdouts():            worker = macOSWorker(app=self.app, redirect_stdouts=False)            def install_HUP_nosupport(controller):                controller.hup_not_supported_installed = True            class Controller(object):                pass            prev = cd.install_HUP_not_supported_handler            cd.install_HUP_not_supported_handler = install_HUP_nosupport            try:                worker.app.IS_macOS = True                controller = Controller()                worker.install_platform_tweaks(controller)                assert controller.hup_not_supported_installed                assert worker.proxy_workaround_installed            finally:                cd.install_HUP_not_supported_handler = prev    def test_general_platform_tweaks(self):        restart_worker_handler_installed = [False]        def install_worker_restart_handler(worker):            restart_worker_handler_installed[0] = True        class Controller(object):            pass        with mock.stdouts():            prev = cd.install_worker_restart_handler            cd.install_worker_restart_handler = install_worker_restart_handler            try:                worker = self.Worker(app=self.app)                worker.app.IS_macOS = False                worker.install_platform_tweaks(Controller())                assert restart_worker_handler_installed[0]            finally:                cd.install_worker_restart_handler = prev    def test_on_consumer_ready(self):        worker_ready_sent = [False]        @signals.worker_ready.connect        def on_worker_ready(**kwargs):            worker_ready_sent[0] = True        with mock.stdouts():            self.Worker(app=self.app).on_consumer_ready(object())            assert worker_ready_sent[0]@mock.stdoutsclass test_funs:    def test_active_thread_count(self):        assert cd.active_thread_count()    @skip.unless_module('setproctitle')    def test_set_process_status(self):        worker = Worker(app=self.app, hostname='xyzza')        prev1, sys.argv = sys.argv, ['Arg0']        try:            st = worker.set_process_status('Running')            assert 'celeryd' in st            assert 'xyzza' in st            assert 'Running' in st            prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']            try:                st = worker.set_process_status('Running')                assert 'celeryd' in st                assert 'xyzza' in st                assert 'Running' in st                assert 'Arg1' in st            finally:                sys.argv = prev2        finally:            sys.argv = prev1    def test_parse_options(self):        cmd = worker()        cmd.app = self.app        opts, args = cmd.parse_options('worker', ['--concurrency=512',                                       '--heartbeat-interval=10'])        assert opts['concurrency'] == 512        assert opts['heartbeat_interval'] == 10    def test_main(self):        p, cd.Worker = cd.Worker, Worker        s, sys.argv = sys.argv, ['worker', '--discard']        try:            worker_main(app=self.app)        finally:            cd.Worker = p            sys.argv = s@mock.stdoutsclass test_signal_handlers:    class _Worker(object):        stopped = False        terminated = False        def stop(self, in_sighandler=False):            self.stopped = True        def terminate(self, in_sighandler=False):            self.terminated = True    def psig(self, fun, *args, **kwargs):        handlers = {}        class Signals(platforms.Signals):            def __setitem__(self, sig, handler):                handlers[sig] = handler        p, platforms.signals = platforms.signals, Signals()        try:            fun(*args, **kwargs)            return handlers        finally:            platforms.signals = p    def test_worker_int_handler(self):        worker = self._Worker()        handlers = self.psig(cd.install_worker_int_handler, worker)        next_handlers = {}        state.should_stop = None        state.should_terminate = None        class Signals(platforms.Signals):            def __setitem__(self, sig, handler):                next_handlers[sig] = handler        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            p, platforms.signals = platforms.signals, Signals()            try:                handlers['SIGINT']('SIGINT', object())                assert state.should_stop                assert state.should_stop == EX_FAILURE            finally:                platforms.signals = p                state.should_stop = None            try:                next_handlers['SIGINT']('SIGINT', object())                assert state.should_terminate                assert state.should_terminate == EX_FAILURE            finally:                state.should_terminate = None        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            p, platforms.signals = platforms.signals, Signals()            try:                with pytest.raises(WorkerShutdown):                    handlers['SIGINT']('SIGINT', object())            finally:                platforms.signals = p            with pytest.raises(WorkerTerminate):                next_handlers['SIGINT']('SIGINT', object())    @skip.unless_module('multiprocessing')    def test_worker_int_handler_only_stop_MainProcess(self):        process = current_process()        name, process.name = process.name, 'OtherProcess'        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            try:                worker = self._Worker()                handlers = self.psig(cd.install_worker_int_handler, worker)                handlers['SIGINT']('SIGINT', object())                assert state.should_stop            finally:                process.name = name                state.should_stop = None        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            try:                worker = self._Worker()                handlers = self.psig(cd.install_worker_int_handler, worker)                with pytest.raises(WorkerShutdown):                    handlers['SIGINT']('SIGINT', object())            finally:                process.name = name                state.should_stop = None    def test_install_HUP_not_supported_handler(self):        worker = self._Worker()        handlers = self.psig(cd.install_HUP_not_supported_handler, worker)        handlers['SIGHUP']('SIGHUP', object())    @skip.unless_module('multiprocessing')    def test_worker_term_hard_handler_only_stop_MainProcess(self):        process = current_process()        name, process.name = process.name, 'OtherProcess'        try:            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 3                worker = self._Worker()                handlers = self.psig(                    cd.install_worker_term_hard_handler, worker)                try:                    handlers['SIGQUIT']('SIGQUIT', object())                    assert state.should_terminate                finally:                    state.should_terminate = None            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 1                worker = self._Worker()                handlers = self.psig(                    cd.install_worker_term_hard_handler, worker)                try:                    with pytest.raises(WorkerTerminate):                        handlers['SIGQUIT']('SIGQUIT', object())                finally:                    state.should_terminate = None        finally:            process.name = name    def test_worker_term_handler_when_threads(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_handler, worker)            try:                handlers['SIGTERM']('SIGTERM', object())                assert state.should_stop == EX_OK            finally:                state.should_stop = None    def test_worker_term_handler_when_single_thread(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_handler, worker)            try:                with pytest.raises(WorkerShutdown):                    handlers['SIGTERM']('SIGTERM', object())            finally:                state.should_stop = None    @patch('sys.__stderr__')    @skip.if_pypy()    @skip.if_jython()    def test_worker_cry_handler(self, stderr):        handlers = self.psig(cd.install_cry_handler)        assert handlers['SIGUSR1']('SIGUSR1', object()) is None        stderr.write.assert_called()    @skip.unless_module('multiprocessing')    def test_worker_term_handler_only_stop_MainProcess(self):        process = current_process()        name, process.name = process.name, 'OtherProcess'        try:            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 3                worker = self._Worker()                handlers = self.psig(cd.install_worker_term_handler, worker)                handlers['SIGTERM']('SIGTERM', object())                assert state.should_stop == EX_OK            with patch('celery.apps.worker.active_thread_count') as c:                c.return_value = 1                worker = self._Worker()                handlers = self.psig(cd.install_worker_term_handler, worker)                with pytest.raises(WorkerShutdown):                    handlers['SIGTERM']('SIGTERM', object())        finally:            process.name = name            state.should_stop = None    @skip.unless_symbol('os.execv')    @patch('celery.platforms.close_open_fds')    @patch('atexit.register')    @patch('os.close')    def test_worker_restart_handler(self, _close, register, close_open):        argv = []        def _execv(*args):            argv.extend(args)        execv, os.execv = os.execv, _execv        try:            worker = self._Worker()            handlers = self.psig(cd.install_worker_restart_handler, worker)            handlers['SIGHUP']('SIGHUP', object())            assert state.should_stop == EX_OK            register.assert_called()            callback = register.call_args[0][0]            callback()            assert argv        finally:            os.execv = execv            state.should_stop = None    def test_worker_term_hard_handler_when_threaded(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 3            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_hard_handler, worker)            try:                handlers['SIGQUIT']('SIGQUIT', object())                assert state.should_terminate            finally:                state.should_terminate = None    def test_worker_term_hard_handler_when_single_threaded(self):        with patch('celery.apps.worker.active_thread_count') as c:            c.return_value = 1            worker = self._Worker()            handlers = self.psig(cd.install_worker_term_hard_handler, worker)            with pytest.raises(WorkerTerminate):                handlers['SIGQUIT']('SIGQUIT', object())
 |