|| 
							- from __future__ import absolute_import, unicode_literals
 
- import logging
 
- import os
 
- import pytest
 
- import sys
 
- from billiard.process import current_process
 
- from case import Mock, mock, patch, skip
 
- from kombu import Exchange, Queue
 
- from celery import platforms
 
- from celery import signals
 
- from celery.app import trace
 
- from celery.apps import worker as cd
 
- from celery.bin.worker import worker, main as worker_main
 
- from celery.exceptions import (
 
-     ImproperlyConfigured, WorkerShutdown, WorkerTerminate,
 
- )
 
- from celery.platforms import EX_FAILURE, EX_OK
 
- from celery.worker import state
 
- @pytest.fixture(autouse=True)
 
- def reset_worker_optimizations():
 
-     yield
 
-     trace.reset_worker_optimizations()
 
- class Worker(cd.Worker):
 
-     redirect_stdouts = False
 
-     def start(self, *args, **kwargs):
 
-         self.on_start()
 
- class test_Worker:
 
-     Worker = Worker
 
-     def test_queues_string(self):
 
-         with mock.stdouts():
 
-             w = self.app.Worker()
 
-             w.setup_queues('foo,bar,baz')
 
-             assert 'foo' in self.app.amqp.queues
 
-     def test_cpu_count(self):
 
-         with mock.stdouts():
 
-             with patch('celery.worker.worker.cpu_count') as cpu_count:
 
-                 cpu_count.side_effect = NotImplementedError()
 
-                 w = self.app.Worker(concurrency=None)
 
-                 assert w.concurrency == 2
 
-             w = self.app.Worker(concurrency=5)
 
-             assert w.concurrency == 5
 
-     def test_windows_B_option(self):
 
-         with mock.stdouts():
 
-             self.app.IS_WINDOWS = True
 
-             with pytest.raises(SystemExit):
 
-                 worker(app=self.app).run(beat=True)
 
-     def test_setup_concurrency_very_early(self):
 
-         x = worker()
 
-         x.run = Mock()
 
-         with pytest.raises(ImportError):
 
-             x.execute_from_commandline(['worker', '-P', 'xyzybox'])
 
-     def test_run_from_argv_basic(self):
 
-         x = worker(app=self.app)
 
-         x.run = Mock()
 
-         x.maybe_detach = Mock()
 
-         def run(*args, **kwargs):
 
-             pass
 
-         x.run = run
 
-         x.run_from_argv('celery', [])
 
-         x.maybe_detach.assert_called()
 
-     def test_maybe_detach(self):
 
-         x = worker(app=self.app)
 
-         with patch('celery.bin.worker.detached_celeryd') as detached:
 
-             x.maybe_detach([])
 
-             detached.assert_not_called()
 
-             with pytest.raises(SystemExit):
 
-                 x.maybe_detach(['--detach'])
 
-             detached.assert_called()
 
-     def test_invalid_loglevel_gives_error(self):
 
-         with mock.stdouts():
 
-             x = worker(app=self.app)
 
-             with pytest.raises(SystemExit):
 
-                 x.run(loglevel='GRIM_REAPER')
 
-     def test_no_loglevel(self):
 
-         self.app.Worker = Mock()
 
-         worker(app=self.app).run(loglevel=None)
 
-     def test_tasklist(self):
 
-         worker = self.app.Worker()
 
-         assert worker.app.tasks
 
-         assert worker.app.finalized
 
-         assert worker.tasklist(include_builtins=True)
 
-         worker.tasklist(include_builtins=False)
 
-     def test_extra_info(self):
 
-         worker = self.app.Worker()
 
-         worker.loglevel = logging.WARNING
 
-         assert not worker.extra_info()
 
-         worker.loglevel = logging.INFO
 
-         assert worker.extra_info()
 
-     def test_loglevel_string(self):
 
-         with mock.stdouts():
 
-             worker = self.Worker(app=self.app, loglevel='INFO')
 
-             assert worker.loglevel == logging.INFO
 
-     def test_run_worker(self, patching):
 
-         handlers = {}
 
-         class Signals(platforms.Signals):
 
-             def __setitem__(self, sig, handler):
 
-                 handlers[sig] = handler
 
-         patching.setattr('celery.platforms.signals', Signals())
 
-         with mock.stdouts():
 
-             w = self.Worker(app=self.app)
 
-             w._isatty = False
 
-             w.on_start()
 
-             for sig in 'SIGINT', 'SIGHUP', 'SIGTERM':
 
-                 assert sig in handlers
 
-             handlers.clear()
 
-             w = self.Worker(app=self.app)
 
-             w._isatty = True
 
-             w.on_start()
 
-             for sig in 'SIGINT', 'SIGTERM':
 
-                 assert sig in handlers
 
-             assert 'SIGHUP' not in handlers
 
-     def test_startup_info(self):
 
-         with mock.stdouts():
 
-             worker = self.Worker(app=self.app)
 
-             worker.on_start()
 
-             assert worker.startup_info()
 
-             worker.loglevel = logging.DEBUG
 
-             assert worker.startup_info()
 
-             worker.loglevel = logging.INFO
 
-             assert worker.startup_info()
 
-             worker.autoscale = 13, 10
 
-             assert worker.startup_info()
 
-             prev_loader = self.app.loader
 
-             worker = self.Worker(
 
-                 app=self.app,
 
-                 queues='foo,bar,baz,xuzzy,do,re,mi',
 
-             )
 
-             with patch('celery.apps.worker.qualname') as qualname:
 
-                 qualname.return_value = 'acme.backed_beans.Loader'
 
-                 assert worker.startup_info()
 
-             with patch('celery.apps.worker.qualname') as qualname:
 
-                 qualname.return_value = 'celery.loaders.Loader'
 
-                 assert worker.startup_info()
 
-             from celery.loaders.app import AppLoader
 
-             self.app.loader = AppLoader(app=self.app)
 
-             assert worker.startup_info()
 
-             self.app.loader = prev_loader
 
-             worker.task_events = True
 
-             assert worker.startup_info()
 
-             # test when there are too few output lines
 
-             # to draft the ascii art onto
 
-             prev, cd.ARTLINES = cd.ARTLINES, ['the quick brown fox']
 
-             try:
 
-                 assert worker.startup_info()
 
-             finally:
 
-                 cd.ARTLINES = prev
 
-     def test_run(self):
 
-         with mock.stdouts():
 
-             self.Worker(app=self.app).on_start()
 
-             self.Worker(app=self.app, purge=True).on_start()
 
-             worker = self.Worker(app=self.app)
 
-             worker.on_start()
 
-     def test_purge_messages(self):
 
-         with mock.stdouts():
 
-             self.Worker(app=self.app).purge_messages()
 
-     def test_init_queues(self):
 
-         with mock.stdouts():
 
-             app = self.app
 
-             c = app.conf
 
-             app.amqp.queues = app.amqp.Queues({
 
-                 'celery': {
 
-                     'exchange': 'celery',
 
-                     'routing_key': 'celery',
 
-                 },
 
-                 'video': {
 
-                     'exchange': 'video',
 
-                     'routing_key': 'video',
 
-                 },
 
-             })
 
-             worker = self.Worker(app=self.app)
 
-             worker.setup_queues(['video'])
 
-             assert 'video' in app.amqp.queues
 
-             assert 'video' in app.amqp.queues.consume_from
 
-             assert 'celery' in app.amqp.queues
 
-             assert 'celery' not in app.amqp.queues.consume_from
 
-             c.task_create_missing_queues = False
 
-             del(app.amqp.queues)
 
-             with pytest.raises(ImproperlyConfigured):
 
-                 self.Worker(app=self.app).setup_queues(['image'])
 
-             del(app.amqp.queues)
 
-             c.task_create_missing_queues = True
 
-             worker = self.Worker(app=self.app)
 
-             worker.setup_queues(['image'])
 
-             assert 'image' in app.amqp.queues.consume_from
 
-             assert app.amqp.queues['image'] == Queue(
 
-                 'image', Exchange('image'),
 
-                 routing_key='image',
 
-             )
 
-     def test_autoscale_argument(self):
 
-         with mock.stdouts():
 
-             worker1 = self.Worker(app=self.app, autoscale='10,3')
 
-             assert worker1.autoscale == [10, 3]
 
-             worker2 = self.Worker(app=self.app, autoscale='10')
 
-             assert worker2.autoscale == [10, 0]
 
-     def test_include_argument(self):
 
-         worker1 = self.Worker(app=self.app, include='os')
 
-         assert worker1.include == ['os']
 
-         worker2 = self.Worker(app=self.app,
 
-                               include='os,sys')
 
-         assert worker2.include == ['os', 'sys']
 
-         self.Worker(app=self.app, include=['os', 'sys'])
 
-     def test_unknown_loglevel(self):
 
-         with mock.stdouts():
 
-             with pytest.raises(SystemExit):
 
-                 worker(app=self.app).run(loglevel='ALIEN')
 
-             worker1 = self.Worker(app=self.app, loglevel=0xFFFF)
 
-             assert worker1.loglevel == 0xFFFF
 
-     @patch('os._exit')
 
-     @skip.if_win32()
 
-     def test_warns_if_running_as_privileged_user(self, _exit, patching):
 
-         getuid = patching('os.getuid')
 
-         with mock.stdouts() as (_, stderr):
 
-             getuid.return_value = 0
 
-             self.app.conf.accept_content = ['pickle']
 
-             worker = self.Worker(app=self.app)
 
-             worker.on_start()
 
-             _exit.assert_called_with(1)
 
-             patching.setattr('celery.platforms.C_FORCE_ROOT', True)
 
-             worker = self.Worker(app=self.app)
 
-             worker.on_start()
 
-             assert 'a very bad idea' in stderr.getvalue()
 
-             patching.setattr('celery.platforms.C_FORCE_ROOT', False)
 
-             self.app.conf.accept_content = ['json']
 
-             worker = self.Worker(app=self.app)
 
-             worker.on_start()
 
-             assert 'superuser' in stderr.getvalue()
 
-     def test_redirect_stdouts(self):
 
-         with mock.stdouts():
 
-             self.Worker(app=self.app, redirect_stdouts=False)
 
-             with pytest.raises(AttributeError):
 
-                 sys.stdout.logger
 
-     def test_on_start_custom_logging(self):
 
-         with mock.stdouts():
 
-             self.app.log.redirect_stdouts = Mock()
 
-             worker = self.Worker(app=self.app, redirect_stoutds=True)
 
-             worker._custom_logging = True
 
-             worker.on_start()
 
-             self.app.log.redirect_stdouts.assert_not_called()
 
-     def test_setup_logging_no_color(self):
 
-         worker = self.Worker(
 
-             app=self.app, redirect_stdouts=False, no_color=True,
 
-         )
 
-         prev, self.app.log.setup = self.app.log.setup, Mock()
 
-         try:
 
-             worker.setup_logging()
 
-             assert not self.app.log.setup.call_args[1]['colorize']
 
-         finally:
 
-             self.app.log.setup = prev
 
-     def test_startup_info_pool_is_str(self):
 
-         with mock.stdouts():
 
-             worker = self.Worker(app=self.app, redirect_stdouts=False)
 
-             worker.pool_cls = 'foo'
 
-             worker.startup_info()
 
-     def test_redirect_stdouts_already_handled(self):
 
-         logging_setup = [False]
 
-         @signals.setup_logging.connect
 
-         def on_logging_setup(**kwargs):
 
-             logging_setup[0] = True
 
-         try:
 
-             worker = self.Worker(app=self.app, redirect_stdouts=False)
 
-             worker.app.log.already_setup = False
 
-             worker.setup_logging()
 
-             assert logging_setup[0]
 
-             with pytest.raises(AttributeError):
 
-                 sys.stdout.logger
 
-         finally:
 
-             signals.setup_logging.disconnect(on_logging_setup)
 
-     def test_platform_tweaks_macOS(self):
 
-         class macOSWorker(Worker):
 
-             proxy_workaround_installed = False
 
-             def macOS_proxy_detection_workaround(self):
 
-                 self.proxy_workaround_installed = True
 
-         with mock.stdouts():
 
-             worker = macOSWorker(app=self.app, redirect_stdouts=False)
 
-             def install_HUP_nosupport(controller):
 
-                 controller.hup_not_supported_installed = True
 
-             class Controller(object):
 
-                 pass
 
-             prev = cd.install_HUP_not_supported_handler
 
-             cd.install_HUP_not_supported_handler = install_HUP_nosupport
 
-             try:
 
-                 worker.app.IS_macOS = True
 
-                 controller = Controller()
 
-                 worker.install_platform_tweaks(controller)
 
-                 assert controller.hup_not_supported_installed
 
-                 assert worker.proxy_workaround_installed
 
-             finally:
 
-                 cd.install_HUP_not_supported_handler = prev
 
-     def test_general_platform_tweaks(self):
 
-         restart_worker_handler_installed = [False]
 
-         def install_worker_restart_handler(worker):
 
-             restart_worker_handler_installed[0] = True
 
-         class Controller(object):
 
-             pass
 
-         with mock.stdouts():
 
-             prev = cd.install_worker_restart_handler
 
-             cd.install_worker_restart_handler = install_worker_restart_handler
 
-             try:
 
-                 worker = self.Worker(app=self.app)
 
-                 worker.app.IS_macOS = False
 
-                 worker.install_platform_tweaks(Controller())
 
-                 assert restart_worker_handler_installed[0]
 
-             finally:
 
-                 cd.install_worker_restart_handler = prev
 
-     def test_on_consumer_ready(self):
 
-         worker_ready_sent = [False]
 
-         @signals.worker_ready.connect
 
-         def on_worker_ready(**kwargs):
 
-             worker_ready_sent[0] = True
 
-         with mock.stdouts():
 
-             self.Worker(app=self.app).on_consumer_ready(object())
 
-             assert worker_ready_sent[0]
 
- @mock.stdouts
 
- class test_funs:
 
-     def test_active_thread_count(self):
 
-         assert cd.active_thread_count()
 
-     @skip.unless_module('setproctitle')
 
-     def test_set_process_status(self):
 
-         worker = Worker(app=self.app, hostname='xyzza')
 
-         prev1, sys.argv = sys.argv, ['Arg0']
 
-         try:
 
-             st = worker.set_process_status('Running')
 
-             assert 'celeryd' in st
 
-             assert 'xyzza' in st
 
-             assert 'Running' in st
 
-             prev2, sys.argv = sys.argv, ['Arg0', 'Arg1']
 
-             try:
 
-                 st = worker.set_process_status('Running')
 
-                 assert 'celeryd' in st
 
-                 assert 'xyzza' in st
 
-                 assert 'Running' in st
 
-                 assert 'Arg1' in st
 
-             finally:
 
-                 sys.argv = prev2
 
-         finally:
 
-             sys.argv = prev1
 
-     def test_parse_options(self):
 
-         cmd = worker()
 
-         cmd.app = self.app
 
-         opts, args = cmd.parse_options('worker', ['--concurrency=512',
 
-                                        '--heartbeat-interval=10'])
 
-         assert opts['concurrency'] == 512
 
-         assert opts['heartbeat_interval'] == 10
 
-     def test_main(self):
 
-         p, cd.Worker = cd.Worker, Worker
 
-         s, sys.argv = sys.argv, ['worker', '--discard']
 
-         try:
 
-             worker_main(app=self.app)
 
-         finally:
 
-             cd.Worker = p
 
-             sys.argv = s
 
- @mock.stdouts
 
- class test_signal_handlers:
 
-     class _Worker(object):
 
-         stopped = False
 
-         terminated = False
 
-         def stop(self, in_sighandler=False):
 
-             self.stopped = True
 
-         def terminate(self, in_sighandler=False):
 
-             self.terminated = True
 
-     def psig(self, fun, *args, **kwargs):
 
-         handlers = {}
 
-         class Signals(platforms.Signals):
 
-             def __setitem__(self, sig, handler):
 
-                 handlers[sig] = handler
 
-         p, platforms.signals = platforms.signals, Signals()
 
-         try:
 
-             fun(*args, **kwargs)
 
-             return handlers
 
-         finally:
 
-             platforms.signals = p
 
-     def test_worker_int_handler(self):
 
-         worker = self._Worker()
 
-         handlers = self.psig(cd.install_worker_int_handler, worker)
 
-         next_handlers = {}
 
-         state.should_stop = None
 
-         state.should_terminate = None
 
-         class Signals(platforms.Signals):
 
-             def __setitem__(self, sig, handler):
 
-                 next_handlers[sig] = handler
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 3
 
-             p, platforms.signals = platforms.signals, Signals()
 
-             try:
 
-                 handlers['SIGINT']('SIGINT', object())
 
-                 assert state.should_stop
 
-                 assert state.should_stop == EX_FAILURE
 
-             finally:
 
-                 platforms.signals = p
 
-                 state.should_stop = None
 
-             try:
 
-                 next_handlers['SIGINT']('SIGINT', object())
 
-                 assert state.should_terminate
 
-                 assert state.should_terminate == EX_FAILURE
 
-             finally:
 
-                 state.should_terminate = None
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 1
 
-             p, platforms.signals = platforms.signals, Signals()
 
-             try:
 
-                 with pytest.raises(WorkerShutdown):
 
-                     handlers['SIGINT']('SIGINT', object())
 
-             finally:
 
-                 platforms.signals = p
 
-             with pytest.raises(WorkerTerminate):
 
-                 next_handlers['SIGINT']('SIGINT', object())
 
-     @skip.unless_module('multiprocessing')
 
-     def test_worker_int_handler_only_stop_MainProcess(self):
 
-         process = current_process()
 
-         name, process.name = process.name, 'OtherProcess'
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 3
 
-             try:
 
-                 worker = self._Worker()
 
-                 handlers = self.psig(cd.install_worker_int_handler, worker)
 
-                 handlers['SIGINT']('SIGINT', object())
 
-                 assert state.should_stop
 
-             finally:
 
-                 process.name = name
 
-                 state.should_stop = None
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 1
 
-             try:
 
-                 worker = self._Worker()
 
-                 handlers = self.psig(cd.install_worker_int_handler, worker)
 
-                 with pytest.raises(WorkerShutdown):
 
-                     handlers['SIGINT']('SIGINT', object())
 
-             finally:
 
-                 process.name = name
 
-                 state.should_stop = None
 
-     def test_install_HUP_not_supported_handler(self):
 
-         worker = self._Worker()
 
-         handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
 
-         handlers['SIGHUP']('SIGHUP', object())
 
-     @skip.unless_module('multiprocessing')
 
-     def test_worker_term_hard_handler_only_stop_MainProcess(self):
 
-         process = current_process()
 
-         name, process.name = process.name, 'OtherProcess'
 
-         try:
 
-             with patch('celery.apps.worker.active_thread_count') as c:
 
-                 c.return_value = 3
 
-                 worker = self._Worker()
 
-                 handlers = self.psig(
 
-                     cd.install_worker_term_hard_handler, worker)
 
-                 try:
 
-                     handlers['SIGQUIT']('SIGQUIT', object())
 
-                     assert state.should_terminate
 
-                 finally:
 
-                     state.should_terminate = None
 
-             with patch('celery.apps.worker.active_thread_count') as c:
 
-                 c.return_value = 1
 
-                 worker = self._Worker()
 
-                 handlers = self.psig(
 
-                     cd.install_worker_term_hard_handler, worker)
 
-                 try:
 
-                     with pytest.raises(WorkerTerminate):
 
-                         handlers['SIGQUIT']('SIGQUIT', object())
 
-                 finally:
 
-                     state.should_terminate = None
 
-         finally:
 
-             process.name = name
 
-     def test_worker_term_handler_when_threads(self):
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 3
 
-             worker = self._Worker()
 
-             handlers = self.psig(cd.install_worker_term_handler, worker)
 
-             try:
 
-                 handlers['SIGTERM']('SIGTERM', object())
 
-                 assert state.should_stop == EX_OK
 
-             finally:
 
-                 state.should_stop = None
 
-     def test_worker_term_handler_when_single_thread(self):
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 1
 
-             worker = self._Worker()
 
-             handlers = self.psig(cd.install_worker_term_handler, worker)
 
-             try:
 
-                 with pytest.raises(WorkerShutdown):
 
-                     handlers['SIGTERM']('SIGTERM', object())
 
-             finally:
 
-                 state.should_stop = None
 
-     @patch('sys.__stderr__')
 
-     @skip.if_pypy()
 
-     @skip.if_jython()
 
-     def test_worker_cry_handler(self, stderr):
 
-         handlers = self.psig(cd.install_cry_handler)
 
-         assert handlers['SIGUSR1']('SIGUSR1', object()) is None
 
-         stderr.write.assert_called()
 
-     @skip.unless_module('multiprocessing')
 
-     def test_worker_term_handler_only_stop_MainProcess(self):
 
-         process = current_process()
 
-         name, process.name = process.name, 'OtherProcess'
 
-         try:
 
-             with patch('celery.apps.worker.active_thread_count') as c:
 
-                 c.return_value = 3
 
-                 worker = self._Worker()
 
-                 handlers = self.psig(cd.install_worker_term_handler, worker)
 
-                 handlers['SIGTERM']('SIGTERM', object())
 
-                 assert state.should_stop == EX_OK
 
-             with patch('celery.apps.worker.active_thread_count') as c:
 
-                 c.return_value = 1
 
-                 worker = self._Worker()
 
-                 handlers = self.psig(cd.install_worker_term_handler, worker)
 
-                 with pytest.raises(WorkerShutdown):
 
-                     handlers['SIGTERM']('SIGTERM', object())
 
-         finally:
 
-             process.name = name
 
-             state.should_stop = None
 
-     @skip.unless_symbol('os.execv')
 
-     @patch('celery.platforms.close_open_fds')
 
-     @patch('atexit.register')
 
-     @patch('os.close')
 
-     def test_worker_restart_handler(self, _close, register, close_open):
 
-         argv = []
 
-         def _execv(*args):
 
-             argv.extend(args)
 
-         execv, os.execv = os.execv, _execv
 
-         try:
 
-             worker = self._Worker()
 
-             handlers = self.psig(cd.install_worker_restart_handler, worker)
 
-             handlers['SIGHUP']('SIGHUP', object())
 
-             assert state.should_stop == EX_OK
 
-             register.assert_called()
 
-             callback = register.call_args[0][0]
 
-             callback()
 
-             assert argv
 
-         finally:
 
-             os.execv = execv
 
-             state.should_stop = None
 
-     def test_worker_term_hard_handler_when_threaded(self):
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 3
 
-             worker = self._Worker()
 
-             handlers = self.psig(cd.install_worker_term_hard_handler, worker)
 
-             try:
 
-                 handlers['SIGQUIT']('SIGQUIT', object())
 
-                 assert state.should_terminate
 
-             finally:
 
-                 state.should_terminate = None
 
-     def test_worker_term_hard_handler_when_single_threaded(self):
 
-         with patch('celery.apps.worker.active_thread_count') as c:
 
-             c.return_value = 1
 
-             worker = self._Worker()
 
-             handlers = self.psig(cd.install_worker_term_hard_handler, worker)
 
-             with pytest.raises(WorkerTerminate):
 
-                 handlers['SIGQUIT']('SIGQUIT', object())
 
 
  |