| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 | from __future__ import absolute_import, unicode_literalsimport osimport pytestfrom case import Mock, mock, patchfrom celery.five import bytes_if_py2from celery.bin.base import (    Command,    Option,    Extensions,)class MyApp(object):    user_options = {'preload': None}APP = MyApp()  # <-- Used by test_with_custom_appclass MockCommand(Command):    mock_args = ('arg1', 'arg2', 'arg3')    def parse_options(self, prog_name, arguments, command=None):        options = {'foo': 'bar', 'prog_name': prog_name}        return options, self.mock_args    def run(self, *args, **kwargs):        return args, kwargsclass test_Extensions:    def test_load(self):        with patch('pkg_resources.iter_entry_points') as iterep:            with patch('celery.utils.imports.symbol_by_name') as symbyname:                ep = Mock()                ep.name = 'ep'                ep.module_name = 'foo'                ep.attrs = ['bar', 'baz']                iterep.return_value = [ep]                cls = symbyname.return_value = Mock()                register = Mock()                e = Extensions('unit', register)                e.load()                symbyname.assert_called_with('foo:bar')                register.assert_called_with(cls, name='ep')            with patch('celery.utils.imports.symbol_by_name') as symbyname:                symbyname.side_effect = SyntaxError()                with patch('warnings.warn') as warn:                    e.load()                    warn.assert_called()            with patch('celery.utils.imports.symbol_by_name') as symbyname:                symbyname.side_effect = KeyError('foo')                with pytest.raises(KeyError):                    e.load()class test_Command:    def test_get_options(self):        cmd = Command()        cmd.option_list = (1, 2, 3)        assert cmd.get_options() == (1, 2, 3)    def test_custom_description(self):        class C(Command):            description = 'foo'        c = C()        assert c.description == 'foo'    def test_format_epilog(self):        assert Command()._format_epilog('hello')        assert not Command()._format_epilog('')    def test_format_description(self):        assert Command()._format_description('hello')    def test_register_callbacks(self):        c = Command(on_error=8, on_usage_error=9)        assert c.on_error == 8        assert c.on_usage_error == 9    def test_run_raises_UsageError(self):        cb = Mock()        c = Command(on_usage_error=cb)        c.verify_args = Mock()        c.run = Mock()        exc = c.run.side_effect = c.UsageError('foo', status=3)        assert c() == exc.status        cb.assert_called_with(exc)        c.verify_args.assert_called_with(())    def test_default_on_usage_error(self):        cmd = Command()        cmd.handle_error = Mock()        exc = Exception()        cmd.on_usage_error(exc)        cmd.handle_error.assert_called_with(exc)    def test_verify_args_missing(self):        c = Command()        def run(a, b, c):            pass        c.run = run        with pytest.raises(c.UsageError):            c.verify_args((1,))        c.verify_args((1, 2, 3))    def test_run_interface(self):        with pytest.raises(NotImplementedError):            Command().run()    @patch('sys.stdout')    def test_early_version(self, stdout):        cmd = Command()        with pytest.raises(SystemExit):            cmd.early_version(['--version'])    def test_execute_from_commandline(self, app):        cmd = MockCommand(app=app)        args1, kwargs1 = cmd.execute_from_commandline()     # sys.argv        assert args1 == cmd.mock_args        assert kwargs1['foo'] == 'bar'        assert kwargs1.get('prog_name')        args2, kwargs2 = cmd.execute_from_commandline(['foo'])   # pass list        assert args2 == cmd.mock_args        assert kwargs2['foo'] == 'bar'        assert kwargs2['prog_name'] == 'foo'    def test_with_bogus_args(self, app):        with mock.stdouts() as (_, stderr):            cmd = MockCommand(app=app)            cmd.supports_args = False            with pytest.raises(SystemExit):                cmd.execute_from_commandline(argv=['--bogus'])            assert stderr.getvalue()            assert 'Unrecognized' in stderr.getvalue()    def test_with_custom_config_module(self, app):        prev = os.environ.pop('CELERY_CONFIG_MODULE', None)        try:            cmd = MockCommand(app=app)            cmd.setup_app_from_commandline(['--config=foo.bar.baz'])            assert os.environ.get('CELERY_CONFIG_MODULE') == 'foo.bar.baz'        finally:            if prev:                os.environ['CELERY_CONFIG_MODULE'] = prev            else:                os.environ.pop('CELERY_CONFIG_MODULE', None)    def test_with_custom_broker(self, app):        prev = os.environ.pop('CELERY_BROKER_URL', None)        try:            cmd = MockCommand(app=app)            cmd.setup_app_from_commandline(['--broker=xyzza://'])            assert os.environ.get('CELERY_BROKER_URL') == 'xyzza://'        finally:            if prev:                os.environ['CELERY_BROKER_URL'] = prev            else:                os.environ.pop('CELERY_BROKER_URL', None)    def test_with_custom_app(self, app):        cmd = MockCommand(app=app)        appstr = '.'.join([__name__, 'APP'])        cmd.setup_app_from_commandline(['--app=%s' % (appstr,),                                        '--loglevel=INFO'])        assert cmd.app is APP        cmd.setup_app_from_commandline(['-A', appstr,                                        '--loglevel=INFO'])        assert cmd.app is APP    def test_setup_app_sets_quiet(self, app):        cmd = MockCommand(app=app)        cmd.setup_app_from_commandline(['-q'])        assert cmd.quiet        cmd2 = MockCommand(app=app)        cmd2.setup_app_from_commandline(['--quiet'])        assert cmd2.quiet    def test_setup_app_sets_chdir(self, app):        with patch('os.chdir') as chdir:            cmd = MockCommand(app=app)            cmd.setup_app_from_commandline(['--workdir=/opt'])            chdir.assert_called_with('/opt')    def test_setup_app_sets_loader(self, app):        prev = os.environ.get('CELERY_LOADER')        try:            cmd = MockCommand(app=app)            cmd.setup_app_from_commandline(['--loader=X.Y:Z'])            assert os.environ['CELERY_LOADER'] == 'X.Y:Z'        finally:            if prev is not None:                os.environ['CELERY_LOADER'] = prev            else:                del(os.environ['CELERY_LOADER'])    def test_setup_app_no_respect(self, app):        cmd = MockCommand(app=app)        cmd.respects_app_option = False        with patch('celery.bin.base.Celery') as cp:            cmd.setup_app_from_commandline(['--app=x.y:z'])            cp.assert_called()    def test_setup_app_custom_app(self, app):        cmd = MockCommand(app=app)        app = cmd.app = Mock()        app.user_options = {'preload': None}        cmd.setup_app_from_commandline([])        assert cmd.app == app    def test_find_app_suspects(self, app):        cmd = MockCommand(app=app)        assert cmd.find_app('t.unit.bin.proj.app')        assert cmd.find_app('t.unit.bin.proj')        assert cmd.find_app('t.unit.bin.proj:hello')        assert cmd.find_app('t.unit.bin.proj.hello')        assert cmd.find_app('t.unit.bin.proj.app:app')        assert cmd.find_app('t.unit.bin.proj.app.app')        with pytest.raises(AttributeError):            cmd.find_app('t.unit.bin')        with pytest.raises(AttributeError):            cmd.find_app(__name__)    def test_ask(self, app, patching):        try:            input = patching('celery.bin.base.input')        except AttributeError:            input = patching('builtins.input')        cmd = MockCommand(app=app)        input.return_value = 'yes'        assert cmd.ask('q', ('yes', 'no'), 'no') == 'yes'        input.return_value = 'nop'        assert cmd.ask('q', ('yes', 'no'), 'no') == 'no'    def test_host_format(self, app):        cmd = MockCommand(app=app)        with patch('celery.utils.nodenames.gethostname') as hn:            hn.return_value = 'blacktron.example.com'            assert cmd.host_format('') == ''            assert (cmd.host_format('celery@%h') ==                    'celery@blacktron.example.com')            assert cmd.host_format('celery@%d') == 'celery@example.com'            assert cmd.host_format('celery@%n') == 'celery@blacktron'    def test_say_chat_quiet(self, app):        cmd = MockCommand(app=app)        cmd.quiet = True        assert cmd.say_chat('<-', 'foo', 'foo') is None    def test_say_chat_show_body(self, app):        cmd = MockCommand(app=app)        cmd.out = Mock()        cmd.show_body = True        cmd.say_chat('->', 'foo', 'body')        cmd.out.assert_called_with('body')    def test_say_chat_no_body(self, app):        cmd = MockCommand(app=app)        cmd.out = Mock()        cmd.show_body = False        cmd.say_chat('->', 'foo', 'body')    @pytest.mark.usefixtures('depends_on_current_app')    def test_with_cmdline_config(self, app):        cmd = MockCommand(app=app)        cmd.enable_config_from_cmdline = True        cmd.namespace = 'worker'        rest = cmd.setup_app_from_commandline(argv=[            '--loglevel=INFO', '--',            'broker.url=amqp://broker.example.com',            '.prefetch_multiplier=100'])        assert cmd.app.conf.broker_url == 'amqp://broker.example.com'        assert cmd.app.conf.worker_prefetch_multiplier == 100        assert rest == ['--loglevel=INFO']        cmd.app = None        cmd.get_app = Mock(name='get_app')        cmd.get_app.return_value = app        app.user_options['preload'] = [            Option('--foo', action='store_true'),        ]        cmd.setup_app_from_commandline(argv=[            '--foo', '--loglevel=INFO', '--',            'broker.url=amqp://broker.example.com',            '.prefetch_multiplier=100'])        assert cmd.app is cmd.get_app()    def test_get_default_app(self, app, patching):        patching('celery._state.get_current_app')        cmd = MockCommand(app=app)        from celery._state import get_current_app        assert cmd._get_default_app() is get_current_app()    def test_set_colored(self, app):        cmd = MockCommand(app=app)        cmd.colored = 'foo'        assert cmd.colored == 'foo'    def test_set_no_color(self, app):        cmd = MockCommand(app=app)        cmd.no_color = False        _ = cmd.colored  # noqa        cmd.no_color = True        assert not cmd.colored.enabled    def test_find_app(self, app):        cmd = MockCommand(app=app)        with patch('celery.utils.imports.symbol_by_name') as sbn:            from types import ModuleType            x = ModuleType(bytes_if_py2('proj'))            def on_sbn(*args, **kwargs):                def after(*args, **kwargs):                    x.app = 'quick brown fox'                    x.__path__ = None                    return x                sbn.side_effect = after                return x            sbn.side_effect = on_sbn            x.__path__ = [True]            assert cmd.find_app('proj') == 'quick brown fox'    def test_parse_preload_options_shortopt(self):        class TestCommand(Command):            def add_preload_arguments(self, parser):                parser.add_argument('-s', action='store', dest='silent')        cmd = TestCommand()        acc = cmd.parse_preload_options(['-s', 'yes'])        assert acc.get('silent') == 'yes'    def test_parse_preload_options_with_equals_and_append(self):        class TestCommand(Command):            def add_preload_arguments(self, parser):                parser.add_argument('--zoom', action='append', default=[])        cmd = Command()        acc = cmd.parse_preload_options(['--zoom=1', '--zoom=2'])        assert acc, {'zoom': ['1' == '2']}    def test_parse_preload_options_without_equals_and_append(self):        cmd = Command()        opt = Option('--zoom', action='append', default=[])        cmd.preload_options = (opt,)        acc = cmd.parse_preload_options(['--zoom', '1', '--zoom', '2'])        assert acc, {'zoom': ['1' == '2']}
 |