from __future__ import absolute_import, unicode_literals import logging import os import sys import pytest from billiard.process import current_process from case import Mock, mock, patch, skip from celery import platforms, signals from celery.app import trace from celery.apps import worker as cd from celery.bin.worker import main as worker_main from celery.bin.worker import worker from celery.exceptions import (ImproperlyConfigured, WorkerShutdown, WorkerTerminate) from celery.platforms import EX_FAILURE, EX_OK from celery.worker import state from kombu import Exchange, Queue @pytest.fixture(autouse=True) def reset_worker_optimizations(): yield trace.reset_worker_optimizations() class Worker(cd.Worker): redirect_stdouts = False def start(self, *args, **kwargs): self.on_start() class test_Worker: Worker = Worker def test_queues_string(self): with mock.stdouts(): w = self.app.Worker() w.setup_queues('foo,bar,baz') assert 'foo' in self.app.amqp.queues def test_cpu_count(self): with mock.stdouts(): with patch('celery.worker.worker.cpu_count') as cpu_count: cpu_count.side_effect = NotImplementedError() w = self.app.Worker(concurrency=None) assert w.concurrency == 2 w = self.app.Worker(concurrency=5) assert w.concurrency == 5 def test_windows_B_option(self): with mock.stdouts(): self.app.IS_WINDOWS = True with pytest.raises(SystemExit): worker(app=self.app).run(beat=True) def test_setup_concurrency_very_early(self): x = worker() x.run = Mock() with pytest.raises(ImportError): x.execute_from_commandline(['worker', '-P', 'xyzybox']) def test_run_from_argv_basic(self): x = worker(app=self.app) x.run = Mock() x.maybe_detach = Mock() def run(*args, **kwargs): pass x.run = run x.run_from_argv('celery', []) x.maybe_detach.assert_called() def test_maybe_detach(self): x = worker(app=self.app) with patch('celery.bin.worker.detached_celeryd') as detached: x.maybe_detach([]) detached.assert_not_called() with pytest.raises(SystemExit): x.maybe_detach(['--detach']) detached.assert_called() def test_invalid_loglevel_gives_error(self): with mock.stdouts(): x = worker(app=self.app) with pytest.raises(SystemExit): x.run(loglevel='GRIM_REAPER') def test_no_loglevel(self): self.app.Worker = Mock() worker(app=self.app).run(loglevel=None) def test_tasklist(self): worker = self.app.Worker() assert worker.app.tasks assert worker.app.finalized assert worker.tasklist(include_builtins=True) worker.tasklist(include_builtins=False) def test_extra_info(self): worker = self.app.Worker() worker.loglevel = logging.WARNING assert not worker.extra_info() worker.loglevel = logging.INFO assert worker.extra_info() def test_loglevel_string(self): with mock.stdouts(): worker = self.Worker(app=self.app, loglevel='INFO') assert worker.loglevel == logging.INFO def test_run_worker(self, patching): handlers = {} class Signals(platforms.Signals): def __setitem__(self, sig, handler): handlers[sig] = handler patching.setattr('celery.platforms.signals', Signals()) with mock.stdouts(): w = self.Worker(app=self.app) w._isatty = False w.on_start() for sig in 'SIGINT', 'SIGHUP', 'SIGTERM': assert sig in handlers handlers.clear() w = self.Worker(app=self.app) w._isatty = True w.on_start() for sig in 'SIGINT', 'SIGTERM': assert sig in handlers assert 'SIGHUP' not in handlers def test_startup_info(self): with mock.stdouts(): worker = self.Worker(app=self.app) worker.on_start() assert worker.startup_info() worker.loglevel = logging.DEBUG assert worker.startup_info() worker.loglevel = logging.INFO assert worker.startup_info() worker.autoscale = 13, 10 assert worker.startup_info() prev_loader = self.app.loader worker = self.Worker( app=self.app, queues='foo,bar,baz,xuzzy,do,re,mi', ) with patch('celery.apps.worker.qualname') as qualname: qualname.return_value = 'acme.backed_beans.Loader' assert worker.startup_info() with patch('celery.apps.worker.qualname') as qualname: qualname.return_value = 'celery.loaders.Loader' assert worker.startup_info() from celery.loaders.app import AppLoader self.app.loader = AppLoader(app=self.app) assert worker.startup_info() self.app.loader = prev_loader worker.task_events = True assert worker.startup_info() # test when there are too few output lines # to draft the ascii art onto prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox'] try: assert worker.startup_info() finally: cd.ARTLINES = prev def test_run(self): with mock.stdouts(): self.Worker(app=self.app).on_start() self.Worker(app=self.app, purge=True).on_start() worker = self.Worker(app=self.app) worker.on_start() def test_purge_messages(self): with mock.stdouts(): self.Worker(app=self.app).purge_messages() def test_init_queues(self): with mock.stdouts(): app = self.app c = app.conf app.amqp.queues = app.amqp.Queues({ 'celery': { 'exchange': 'celery', 'routing_key': 'celery', }, 'video': { 'exchange': 'video', 'routing_key': 'video', }, }) worker = self.Worker(app=self.app) worker.setup_queues(['video']) assert 'video' in app.amqp.queues assert 'video' in app.amqp.queues.consume_from assert 'celery' in app.amqp.queues assert 'celery' not in app.amqp.queues.consume_from c.task_create_missing_queues = False del(app.amqp.queues) with pytest.raises(ImproperlyConfigured): self.Worker(app=self.app).setup_queues(['image']) del(app.amqp.queues) c.task_create_missing_queues = True worker = self.Worker(app=self.app) worker.setup_queues(['image']) assert 'image' in app.amqp.queues.consume_from assert app.amqp.queues['image'] == Queue( 'image', Exchange('image'), routing_key='image', ) def test_autoscale_argument(self): with mock.stdouts(): worker1 = self.Worker(app=self.app, autoscale='10,3') assert worker1.autoscale == [10, 3] worker2 = self.Worker(app=self.app, autoscale='10') assert worker2.autoscale == [10, 0] def test_include_argument(self): worker1 = self.Worker(app=self.app, include='os') assert worker1.include == ['os'] worker2 = self.Worker(app=self.app, include='os,sys') assert worker2.include == ['os', 'sys'] self.Worker(app=self.app, include=['os', 'sys']) def test_unknown_loglevel(self): with mock.stdouts(): with pytest.raises(SystemExit): worker(app=self.app).run(loglevel='ALIEN') worker1 = self.Worker(app=self.app, loglevel=0xFFFF) assert worker1.loglevel == 0xFFFF @patch('os._exit') @skip.if_win32() def test_warns_if_running_as_privileged_user(self, _exit, patching): getuid = patching('os.getuid') with mock.stdouts() as (_, stderr): getuid.return_value = 0 self.app.conf.accept_content = ['pickle'] worker = self.Worker(app=self.app) worker.on_start() _exit.assert_called_with(1) patching.setattr('celery.platforms.C_FORCE_ROOT', True) worker = self.Worker(app=self.app) worker.on_start() assert 'a very bad idea' in stderr.getvalue() patching.setattr('celery.platforms.C_FORCE_ROOT', False) self.app.conf.accept_content = ['json'] worker = self.Worker(app=self.app) worker.on_start() assert 'superuser' in stderr.getvalue() def test_redirect_stdouts(self): with mock.stdouts(): self.Worker(app=self.app, redirect_stdouts=False) with pytest.raises(AttributeError): sys.stdout.logger def test_on_start_custom_logging(self): with mock.stdouts(): self.app.log.redirect_stdouts = Mock() worker = self.Worker(app=self.app, redirect_stoutds=True) worker._custom_logging = True worker.on_start() self.app.log.redirect_stdouts.assert_not_called() def test_setup_logging_no_color(self): worker = self.Worker( app=self.app, redirect_stdouts=False, no_color=True, ) prev, self.app.log.setup = self.app.log.setup, Mock() try: worker.setup_logging() assert not self.app.log.setup.call_args[1]['colorize'] finally: self.app.log.setup = prev def test_startup_info_pool_is_str(self): with mock.stdouts(): worker = self.Worker(app=self.app, redirect_stdouts=False) worker.pool_cls = 'foo' worker.startup_info() def test_redirect_stdouts_already_handled(self): logging_setup = [False] @signals.setup_logging.connect def on_logging_setup(**kwargs): logging_setup[0] = True try: worker = self.Worker(app=self.app, redirect_stdouts=False) worker.app.log.already_setup = False worker.setup_logging() assert logging_setup[0] with pytest.raises(AttributeError): sys.stdout.logger finally: signals.setup_logging.disconnect(on_logging_setup) def test_platform_tweaks_macOS(self): class macOSWorker(Worker): proxy_workaround_installed = False def macOS_proxy_detection_workaround(self): self.proxy_workaround_installed = True with mock.stdouts(): worker = macOSWorker(app=self.app, redirect_stdouts=False) def install_HUP_nosupport(controller): controller.hup_not_supported_installed = True class Controller(object): pass prev = cd.install_HUP_not_supported_handler cd.install_HUP_not_supported_handler = install_HUP_nosupport try: worker.app.IS_macOS = True controller = Controller() worker.install_platform_tweaks(controller) assert controller.hup_not_supported_installed assert worker.proxy_workaround_installed finally: cd.install_HUP_not_supported_handler = prev def test_general_platform_tweaks(self): restart_worker_handler_installed = [False] def install_worker_restart_handler(worker): restart_worker_handler_installed[0] = True class Controller(object): pass with mock.stdouts(): prev = cd.install_worker_restart_handler cd.install_worker_restart_handler = install_worker_restart_handler try: worker = self.Worker(app=self.app) worker.app.IS_macOS = False worker.install_platform_tweaks(Controller()) assert restart_worker_handler_installed[0] finally: cd.install_worker_restart_handler = prev def test_on_consumer_ready(self): worker_ready_sent = [False] @signals.worker_ready.connect def on_worker_ready(**kwargs): worker_ready_sent[0] = True with mock.stdouts(): self.Worker(app=self.app).on_consumer_ready(object()) assert worker_ready_sent[0] @mock.stdouts class test_funs: def test_active_thread_count(self): assert cd.active_thread_count() @skip.unless_module('setproctitle') def test_set_process_status(self): worker = Worker(app=self.app, hostname='xyzza') prev1, sys.argv = sys.argv, ['Arg0'] try: st = worker.set_process_status('Running') assert 'celeryd' in st assert 'xyzza' in st assert 'Running' in st prev2, sys.argv = sys.argv, ['Arg0', 'Arg1'] try: st = worker.set_process_status('Running') assert 'celeryd' in st assert 'xyzza' in st assert 'Running' in st assert 'Arg1' in st finally: sys.argv = prev2 finally: sys.argv = prev1 def test_parse_options(self): cmd = worker() cmd.app = self.app opts, args = cmd.parse_options('worker', ['--concurrency=512', '--heartbeat-interval=10']) assert opts['concurrency'] == 512 assert opts['heartbeat_interval'] == 10 def test_main(self): p, cd.Worker = cd.Worker, Worker s, sys.argv = sys.argv, ['worker', '--discard'] try: worker_main(app=self.app) finally: cd.Worker = p sys.argv = s @mock.stdouts class test_signal_handlers: class _Worker(object): hostname = 'foo' stopped = False terminated = False def stop(self, in_sighandler=False): self.stopped = True def terminate(self, in_sighandler=False): self.terminated = True def psig(self, fun, *args, **kwargs): handlers = {} class Signals(platforms.Signals): def __setitem__(self, sig, handler): handlers[sig] = handler p, platforms.signals = platforms.signals, Signals() try: fun(*args, **kwargs) return handlers finally: platforms.signals = p def test_worker_int_handler(self): worker = self._Worker() handlers = self.psig(cd.install_worker_int_handler, worker) next_handlers = {} state.should_stop = None state.should_terminate = None class Signals(platforms.Signals): def __setitem__(self, sig, handler): next_handlers[sig] = handler with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 3 p, platforms.signals = platforms.signals, Signals() try: handlers['SIGINT']('SIGINT', object()) assert state.should_stop assert state.should_stop == EX_FAILURE finally: platforms.signals = p state.should_stop = None try: next_handlers['SIGINT']('SIGINT', object()) assert state.should_terminate assert state.should_terminate == EX_FAILURE finally: state.should_terminate = None with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 1 p, platforms.signals = platforms.signals, Signals() try: with pytest.raises(WorkerShutdown): handlers['SIGINT']('SIGINT', object()) finally: platforms.signals = p with pytest.raises(WorkerTerminate): next_handlers['SIGINT']('SIGINT', object()) @skip.unless_module('multiprocessing') def test_worker_int_handler_only_stop_MainProcess(self): process = current_process() name, process.name = process.name, 'OtherProcess' with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 3 try: worker = self._Worker() handlers = self.psig(cd.install_worker_int_handler, worker) handlers['SIGINT']('SIGINT', object()) assert state.should_stop finally: process.name = name state.should_stop = None with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 1 try: worker = self._Worker() handlers = self.psig(cd.install_worker_int_handler, worker) with pytest.raises(WorkerShutdown): handlers['SIGINT']('SIGINT', object()) finally: process.name = name state.should_stop = None def test_install_HUP_not_supported_handler(self): worker = self._Worker() handlers = self.psig(cd.install_HUP_not_supported_handler, worker) handlers['SIGHUP']('SIGHUP', object()) @skip.unless_module('multiprocessing') def test_worker_term_hard_handler_only_stop_MainProcess(self): process = current_process() name, process.name = process.name, 'OtherProcess' try: with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 3 worker = self._Worker() handlers = self.psig( cd.install_worker_term_hard_handler, worker) try: handlers['SIGQUIT']('SIGQUIT', object()) assert state.should_terminate finally: state.should_terminate = None with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 1 worker = self._Worker() handlers = self.psig( cd.install_worker_term_hard_handler, worker) try: with pytest.raises(WorkerTerminate): handlers['SIGQUIT']('SIGQUIT', object()) finally: state.should_terminate = None finally: process.name = name def test_worker_term_handler_when_threads(self): with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 3 worker = self._Worker() handlers = self.psig(cd.install_worker_term_handler, worker) try: handlers['SIGTERM']('SIGTERM', object()) assert state.should_stop == EX_OK finally: state.should_stop = None def test_worker_term_handler_when_single_thread(self): with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 1 worker = self._Worker() handlers = self.psig(cd.install_worker_term_handler, worker) try: with pytest.raises(WorkerShutdown): handlers['SIGTERM']('SIGTERM', object()) finally: state.should_stop = None @patch('sys.__stderr__') @skip.if_pypy() @skip.if_jython() def test_worker_cry_handler(self, stderr): handlers = self.psig(cd.install_cry_handler) assert handlers['SIGUSR1']('SIGUSR1', object()) is None stderr.write.assert_called() @skip.unless_module('multiprocessing') def test_worker_term_handler_only_stop_MainProcess(self): process = current_process() name, process.name = process.name, 'OtherProcess' try: with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 3 worker = self._Worker() handlers = self.psig(cd.install_worker_term_handler, worker) handlers['SIGTERM']('SIGTERM', object()) assert state.should_stop == EX_OK with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 1 worker = self._Worker() handlers = self.psig(cd.install_worker_term_handler, worker) with pytest.raises(WorkerShutdown): handlers['SIGTERM']('SIGTERM', object()) finally: process.name = name state.should_stop = None @skip.unless_symbol('os.execv') @patch('celery.platforms.close_open_fds') @patch('atexit.register') @patch('os.close') def test_worker_restart_handler(self, _close, register, close_open): argv = [] def _execv(*args): argv.extend(args) execv, os.execv = os.execv, _execv try: worker = self._Worker() handlers = self.psig(cd.install_worker_restart_handler, worker) handlers['SIGHUP']('SIGHUP', object()) assert state.should_stop == EX_OK register.assert_called() callback = register.call_args[0][0] callback() assert argv finally: os.execv = execv state.should_stop = None def test_worker_term_hard_handler_when_threaded(self): with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 3 worker = self._Worker() handlers = self.psig(cd.install_worker_term_hard_handler, worker) try: handlers['SIGQUIT']('SIGQUIT', object()) assert state.should_terminate finally: state.should_terminate = None def test_worker_term_hard_handler_when_single_threaded(self): with patch('celery.apps.worker.active_thread_count') as c: c.return_value = 1 worker = self._Worker() handlers = self.psig(cd.install_worker_term_hard_handler, worker) with pytest.raises(WorkerTerminate): handlers['SIGQUIT']('SIGQUIT', object()) def test_send_worker_shutting_down_signal(self): with patch('celery.apps.worker.signals.worker_shutting_down') as wsd: worker = self._Worker() handlers = self.psig(cd.install_worker_term_handler, worker) try: with pytest.raises(WorkerShutdown): handlers['SIGTERM']('SIGTERM', object()) finally: state.should_stop = None wsd.send.assert_called_with( sender='foo', sig='SIGTERM', how='Warm', exitcode=0, )