| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 | from __future__ import absolute_import, unicode_literalsimport pytestimport sysfrom datetime import datetimefrom case import Mock, patchfrom kombu.utils.json import dumpsfrom celery import __main__from celery.bin.base import Errorfrom celery.bin import celery as modfrom celery.bin.celery import (    Command,    list_,    call,    purge,    result,    inspect,    control,    status,    migrate,    help,    report,    CeleryCommand,    determine_exit_status,    multi,    main as mainfun,    _RemoteControl,)from celery.five import WhateverIOfrom celery.platforms import EX_FAILURE, EX_USAGE, EX_OKclass test__main__:    def test_main(self):        with patch('celery.__main__.maybe_patch_concurrency') as mpc:            with patch('celery.bin.celery.main') as main:                __main__.main()                mpc.assert_called_with()                main.assert_called_with()    def test_main__multi(self):        with patch('celery.__main__.maybe_patch_concurrency') as mpc:            with patch('celery.bin.celery.main') as main:                prev, sys.argv = sys.argv, ['foo', 'multi']                try:                    __main__.main()                    mpc.assert_not_called()                    main.assert_called_with()                finally:                    sys.argv = prevclass test_Command:    def test_Error_repr(self):        x = Error('something happened')        assert x.status is not None        assert x.reason        assert str(x)    def setup(self):        self.out = WhateverIO()        self.err = WhateverIO()        self.cmd = Command(self.app, stdout=self.out, stderr=self.err)    def test_error(self):        self.cmd.out = Mock()        self.cmd.error('FOO')        self.cmd.out.assert_called()    def test_out(self):        f = Mock()        self.cmd.out('foo', f)    def test_call(self):        def ok_run():            pass        self.cmd.run = ok_run        assert self.cmd() == EX_OK        def error_run():            raise Error('error', EX_FAILURE)        self.cmd.run = error_run        assert self.cmd() == EX_FAILURE    def test_run_from_argv(self):        with pytest.raises(NotImplementedError):            self.cmd.run_from_argv('prog', ['foo', 'bar'])    def test_pretty_list(self):        assert self.cmd.pretty([])[1] == '- empty -'        assert 'bar', self.cmd.pretty(['foo' in 'bar'][1])    def test_pretty_dict(self, text='the quick brown fox'):        assert 'OK' in str(self.cmd.pretty({'ok': text})[0])        assert 'ERROR' in str(self.cmd.pretty({'error': text})[0])    def test_pretty(self):        assert 'OK' in str(self.cmd.pretty('the quick brown'))        assert 'OK' in str(self.cmd.pretty(object()))        assert 'OK' in str(self.cmd.pretty({'foo': 'bar'}))class test_list:    def test_list_bindings_no_support(self):        l = list_(app=self.app, stderr=WhateverIO())        management = Mock()        management.get_bindings.side_effect = NotImplementedError()        with pytest.raises(Error):            l.list_bindings(management)    def test_run(self):        l = list_(app=self.app, stderr=WhateverIO())        l.run('bindings')        with pytest.raises(Error):            l.run(None)        with pytest.raises(Error):            l.run('foo')class test_call:    def setup(self):        @self.app.task(shared=False)        def add(x, y):            return x + y        self.add = add    @patch('celery.app.base.Celery.send_task')    def test_run(self, send_task):        a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())        a.run(self.add.name)        send_task.assert_called()        a.run(self.add.name,              args=dumps([4, 4]),              kwargs=dumps({'x': 2, 'y': 2}))        assert send_task.call_args[1]['args'], [4 == 4]        assert send_task.call_args[1]['kwargs'] == {'x': 2, 'y': 2}        a.run(self.add.name, expires=10, countdown=10)        assert send_task.call_args[1]['expires'] == 10        assert send_task.call_args[1]['countdown'] == 10        now = datetime.now()        iso = now.isoformat()        a.run(self.add.name, expires=iso)        assert send_task.call_args[1]['expires'] == now        with pytest.raises(ValueError):            a.run(self.add.name, expires='foobaribazibar')class test_purge:    def test_run(self):        out = WhateverIO()        a = purge(app=self.app, stdout=out)        a._purge = Mock(name='_purge')        a._purge.return_value = 0        a.run(force=True)        assert 'No messages purged' in out.getvalue()        a._purge.return_value = 100        a.run(force=True)        assert '100 messages' in out.getvalue()        a.out = Mock(name='out')        a.ask = Mock(name='ask')        a.run(force=False)        a.ask.assert_called_with(a.warn_prompt, ('yes', 'no'), 'no')        a.ask.return_value = 'yes'        a.run(force=False)class test_result:    def setup(self):        @self.app.task(shared=False)        def add(x, y):            return x + y        self.add = add    def test_run(self):        with patch('celery.result.AsyncResult.get') as get:            out = WhateverIO()            r = result(app=self.app, stdout=out)            get.return_value = 'Jerry'            r.run('id')            assert 'Jerry' in out.getvalue()            get.return_value = 'Elaine'            r.run('id', task=self.add.name)            assert 'Elaine' in out.getvalue()            with patch('celery.result.AsyncResult.traceback') as tb:                r.run('id', task=self.add.name, traceback=True)                assert str(tb) in out.getvalue()class test_status:    @patch('celery.bin.celery.inspect')    def test_run(self, inspect_):        out, err = WhateverIO(), WhateverIO()        ins = inspect_.return_value = Mock()        ins.run.return_value = []        s = status(self.app, stdout=out, stderr=err)        with pytest.raises(Error):            s.run()        ins.run.return_value = ['a', 'b', 'c']        s.run()        assert '3 nodes online' in out.getvalue()        s.run(quiet=True)class test_migrate:    @patch('celery.contrib.migrate.migrate_tasks')    def test_run(self, migrate_tasks):        out = WhateverIO()        m = migrate(app=self.app, stdout=out, stderr=WhateverIO())        with pytest.raises(TypeError):            m.run()        migrate_tasks.assert_not_called()        m.run('memory://foo', 'memory://bar')        migrate_tasks.assert_called()        state = Mock()        state.count = 10        state.strtotal = 30        m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)        assert '10/30' in out.getvalue()class test_report:    def test_run(self):        out = WhateverIO()        r = report(app=self.app, stdout=out)        assert r.run() == EX_OK        assert out.getvalue()class test_help:    def test_run(self):        out = WhateverIO()        h = help(app=self.app, stdout=out)        h.parser = Mock()        assert h.run() == EX_USAGE        assert out.getvalue()        assert h.usage('help')        h.parser.print_help.assert_called_with()class test_CeleryCommand:    def test_execute_from_commandline(self):        x = CeleryCommand(app=self.app)        x.handle_argv = Mock()        x.handle_argv.return_value = 1        with pytest.raises(SystemExit):            x.execute_from_commandline()        x.handle_argv.return_value = True        with pytest.raises(SystemExit):            x.execute_from_commandline()        x.handle_argv.side_effect = KeyboardInterrupt()        with pytest.raises(SystemExit):            x.execute_from_commandline()        x.respects_app_option = True        with pytest.raises(SystemExit):            x.execute_from_commandline(['celery', 'multi'])        assert not x.respects_app_option        x.respects_app_option = True        with pytest.raises(SystemExit):            x.execute_from_commandline(['manage.py', 'celery', 'multi'])        assert not x.respects_app_option    def test_with_pool_option(self):        x = CeleryCommand(app=self.app)        assert x.with_pool_option(['celery', 'events']) is None        assert x.with_pool_option(['celery', 'worker'])        assert x.with_pool_option(['manage.py', 'celery', 'worker'])    def test_load_extensions_no_commands(self):        with patch('celery.bin.celery.Extensions') as Ext:            ext = Ext.return_value = Mock(name='Extension')            ext.load.return_value = None            x = CeleryCommand(app=self.app)            x.load_extension_commands()    def test_load_extensions_commands(self):        with patch('celery.bin.celery.Extensions') as Ext:            prev, mod.command_classes = list(mod.command_classes), Mock()            try:                ext = Ext.return_value = Mock(name='Extension')                ext.load.return_value = ['foo', 'bar']                x = CeleryCommand(app=self.app)                x.load_extension_commands()                mod.command_classes.append.assert_called_with(                    ('Extensions', ['foo', 'bar'], 'magenta'),                )            finally:                mod.command_classes = prev    def test_determine_exit_status(self):        assert determine_exit_status('true') == EX_OK        assert determine_exit_status('') == EX_FAILURE    def test_relocate_args_from_start(self):        x = CeleryCommand(app=self.app)        assert x._relocate_args_from_start(None) == []        relargs1 = x._relocate_args_from_start([            '-l', 'debug', 'worker', '-c', '3', '--foo',        ])        assert relargs1 == ['worker', '-c', '3', '--foo', '-l', 'debug']        relargs2 = x._relocate_args_from_start([            '--pool=gevent', '-l', 'debug', 'worker', '--foo', '-c', '3',        ])        assert relargs2 == [            'worker', '--foo', '-c', '3',            '--pool=gevent', '-l', 'debug',        ]        assert x._relocate_args_from_start(['foo', '--foo=1']) == [            'foo', '--foo=1',        ]    def test_register_command(self):        prev, CeleryCommand.commands = dict(CeleryCommand.commands), {}        try:            fun = Mock(name='fun')            CeleryCommand.register_command(fun, name='foo')            assert CeleryCommand.commands['foo'] is fun        finally:            CeleryCommand.commands = prev    def test_handle_argv(self):        x = CeleryCommand(app=self.app)        x.execute = Mock()        x.handle_argv('celery', [])        x.execute.assert_called_with('help', ['help'])        x.handle_argv('celery', ['start', 'foo'])        x.execute.assert_called_with('start', ['start', 'foo'])    def test_execute(self):        x = CeleryCommand(app=self.app)        Help = x.commands['help'] = Mock()        help = Help.return_value = Mock()        x.execute('fooox', ['a'])        help.run_from_argv.assert_called_with(x.prog_name, [], command='help')        help.reset()        x.execute('help', ['help'])        help.run_from_argv.assert_called_with(x.prog_name, [], command='help')        Dummy = x.commands['dummy'] = Mock()        dummy = Dummy.return_value = Mock()        exc = dummy.run_from_argv.side_effect = Error(            'foo', status='EX_FAILURE',        )        x.on_error = Mock(name='on_error')        help.reset()        x.execute('dummy', ['dummy'])        x.on_error.assert_called_with(exc)        dummy.run_from_argv.assert_called_with(            x.prog_name, [], command='dummy',        )        help.run_from_argv.assert_called_with(            x.prog_name, [], command='help',        )        exc = dummy.run_from_argv.side_effect = x.UsageError('foo')        x.on_usage_error = Mock()        x.execute('dummy', ['dummy'])        x.on_usage_error.assert_called_with(exc)    def test_on_usage_error(self):        x = CeleryCommand(app=self.app)        x.error = Mock()        x.on_usage_error(x.UsageError('foo'), command=None)        x.error.assert_called()        x.on_usage_error(x.UsageError('foo'), command='dummy')    def test_prepare_prog_name(self):        x = CeleryCommand(app=self.app)        main = Mock(name='__main__')        main.__file__ = '/opt/foo.py'        with patch.dict(sys.modules, __main__=main):            assert x.prepare_prog_name('__main__.py') == '/opt/foo.py'            assert x.prepare_prog_name('celery') == 'celery'class test_RemoteControl:    def test_call_interface(self):        with pytest.raises(NotImplementedError):            _RemoteControl(app=self.app).call()class test_inspect:    def test_usage(self):        assert inspect(app=self.app).usage('foo')    def test_command_info(self):        i = inspect(app=self.app)        assert i.get_command_info(            'ping', help=True, color=i.colored.red, app=self.app,        )    def test_list_commands_color(self):        i = inspect(app=self.app)        assert i.list_commands(help=True, color=i.colored.red, app=self.app)        assert i.list_commands(help=False, color=None, app=self.app)    def test_epilog(self):        assert inspect(app=self.app).epilog    def test_do_call_method_sql_transport_type(self):        self.app.connection = Mock()        conn = self.app.connection.return_value = Mock(name='Connection')        conn.transport.driver_type = 'sql'        i = inspect(app=self.app)        with pytest.raises(i.Error):            i.do_call_method(['ping'])    def test_say_directions(self):        i = inspect(self.app)        i.out = Mock()        i.quiet = True        i.say_chat('<-', 'hello out')        i.out.assert_not_called()        i.say_chat('->', 'hello in')        i.out.assert_called()        i.quiet = False        i.out.reset_mock()        i.say_chat('<-', 'hello out', 'body')        i.out.assert_called()    @patch('celery.app.control.Control.inspect')    def test_run(self, real):        out = WhateverIO()        i = inspect(app=self.app, stdout=out)        with pytest.raises(Error):            i.run()        with pytest.raises(Error):            i.run('help')        with pytest.raises(Error):            i.run('xyzzybaz')        i.run('ping')        real.assert_called()        i.run('ping', destination='foo,bar')        assert real.call_args[1]['destination'], ['foo' == 'bar']        assert real.call_args[1]['timeout'] == 0.2        callback = real.call_args[1]['callback']        callback({'foo': {'ok': 'pong'}})        assert 'OK' in out.getvalue()        with patch('celery.bin.celery.dumps') as dumps:            i.run('ping', json=True)            dumps.assert_called()        instance = real.return_value = Mock()        instance._request.return_value = None        with pytest.raises(Error):            i.run('ping')        out.seek(0)        out.truncate()        i.quiet = True        i.say_chat('<-', 'hello')        assert not out.getvalue()class test_control:    def control(self, patch_call, *args, **kwargs):        kwargs.setdefault('app', Mock(name='app'))        c = control(*args, **kwargs)        if patch_call:            c.call = Mock(name='control.call')        return c    def test_call(self):        i = self.control(False)        i.call('foo', arguments={'kw': 2})        i.app.control.broadcast.assert_called_with(            'foo', arguments={'kw': 2}, reply=True)class test_multi:    def test_get_options(self):        assert multi(app=self.app).get_options() is None    def test_run_from_argv(self):        with patch('celery.bin.multi.MultiTool') as MultiTool:            m = MultiTool.return_value = Mock()            multi(self.app).run_from_argv('celery', ['arg'], command='multi')            m.execute_from_commandline.assert_called_with(['multi', 'arg'])class test_main:    @patch('celery.bin.celery.CeleryCommand')    def test_main(self, Command):        cmd = Command.return_value = Mock()        mainfun()        cmd.execute_from_commandline.assert_called_with(None)    @patch('celery.bin.celery.CeleryCommand')    def test_main_KeyboardInterrupt(self, Command):        cmd = Command.return_value = Mock()        cmd.execute_from_commandline.side_effect = KeyboardInterrupt()        mainfun()        cmd.execute_from_commandline.assert_called_with(None)
 |