| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 | from __future__ import absolute_import, unicode_literalsimport pytestimport signalimport sysfrom case import Mock, patchfrom celery.bin.multi import main, MultiTool, __doc__ as docfrom celery.five import WhateverIOclass test_MultiTool:    def setup(self):        self.fh = WhateverIO()        self.env = {}        self.t = MultiTool(env=self.env, fh=self.fh)        self.t.cluster_from_argv = Mock(name='cluster_from_argv')        self.t._cluster_from_argv = Mock(name='cluster_from_argv')        self.t.Cluster = Mock(name='Cluster')        self.t.carp = Mock(name='.carp')        self.t.usage = Mock(name='.usage')        self.t.splash = Mock(name='.splash')        self.t.say = Mock(name='.say')        self.t.ok = Mock(name='.ok')        self.cluster = self.t.Cluster.return_value        def _cluster_from_argv(argv):            p = self.t.OptionParser(argv)            p.parse()            return p, self.cluster        self.t.cluster_from_argv.return_value = self.cluster        self.t._cluster_from_argv.side_effect = _cluster_from_argv    def test_findsig(self):        self.assert_sig_argument(['a', 'b', 'c', '-1'], 1)        self.assert_sig_argument(['--foo=1', '-9'], 9)        self.assert_sig_argument(['-INT'], signal.SIGINT)        self.assert_sig_argument([], signal.SIGTERM)        self.assert_sig_argument(['-s'], signal.SIGTERM)        self.assert_sig_argument(['-log'], signal.SIGTERM)    def assert_sig_argument(self, args, expected):        p = self.t.OptionParser(args)        p.parse()        assert self.t._find_sig_argument(p) == expected    def test_execute_from_commandline(self):        self.t.call_command = Mock(name='call_command')        self.t.execute_from_commandline(            'multi start --verbose 10 --foo'.split(),            cmd='X',        )        assert self.t.cmd == 'X'        assert self.t.prog_name == 'multi'        self.t.call_command.assert_called_with('start', ['10', '--foo'])    def test_execute_from_commandline__arguments(self):        assert self.t.execute_from_commandline('multi'.split())        assert self.t.execute_from_commandline('multi -bar'.split())    def test_call_command(self):        cmd = self.t.commands['foo'] = Mock(name='foo')        self.t.retcode = 303        assert (self.t.call_command('foo', ['1', '2', '--foo=3']) is                cmd.return_value)        cmd.assert_called_with('1', '2', '--foo=3')    def test_call_command__error(self):        assert self.t.call_command('asdqwewqe', ['1', '2']) == 1        self.t.carp.assert_called()    def test_handle_reserved_options(self):        assert self.t._handle_reserved_options(            ['a', '-q', 'b', '--no-color', 'c']) == ['a', 'b', 'c']    def test_start(self):        self.cluster.start.return_value = [0, 0, 1, 0]        assert self.t.start('10', '-A', 'proj')        self.t.splash.assert_called_with()        self.t.cluster_from_argv.assert_called_with(('10', '-A', 'proj'))        self.cluster.start.assert_called_with()    def test_start__exitcodes(self):        self.cluster.start.return_value = [0, 0, 0]        assert not self.t.start('foo', 'bar', 'baz')        self.cluster.start.assert_called_with()        self.cluster.start.return_value = [0, 1, 0]        assert self.t.start('foo', 'bar', 'baz')    def test_stop(self):        self.t.stop('10', '-A', 'proj', retry=3)        self.t.splash.assert_called_with()        self.t._cluster_from_argv.assert_called_with(('10', '-A', 'proj'))        self.cluster.stop.assert_called_with(retry=3, sig=signal.SIGTERM)    def test_stopwait(self):        self.t.stopwait('10', '-A', 'proj', retry=3)        self.t.splash.assert_called_with()        self.t._cluster_from_argv.assert_called_with(('10', '-A', 'proj'))        self.cluster.stopwait.assert_called_with(retry=3, sig=signal.SIGTERM)    def test_restart(self):        self.cluster.restart.return_value = [0, 0, 1, 0]        self.t.restart('10', '-A', 'proj')        self.t.splash.assert_called_with()        self.t._cluster_from_argv.assert_called_with(('10', '-A', 'proj'))        self.cluster.restart.assert_called_with(sig=signal.SIGTERM)    def test_names(self):        self.t.cluster_from_argv.return_value = [Mock(), Mock()]        self.t.cluster_from_argv.return_value[0].name = 'x'        self.t.cluster_from_argv.return_value[1].name = 'y'        self.t.names('10', '-A', 'proj')        self.t.say.assert_called()    def test_get(self):        node = self.cluster.find.return_value = Mock(name='node')        node.argv = ['A', 'B', 'C']        assert (self.t.get('wanted', '10', '-A', 'proj') is                self.t.ok.return_value)        self.cluster.find.assert_called_with('wanted')        self.t.cluster_from_argv.assert_called_with(('10', '-A', 'proj'))        self.t.ok.assert_called_with(' '.join(node.argv))    def test_get__KeyError(self):        self.cluster.find.side_effect = KeyError()        assert self.t.get('wanted', '10', '-A', 'proj')    def test_show(self):        nodes = self.t.cluster_from_argv.return_value = [            Mock(name='n1'),            Mock(name='n2'),        ]        nodes[0].argv_with_executable = ['python', 'foo', 'bar']        nodes[1].argv_with_executable = ['python', 'xuzzy', 'baz']        assert self.t.show('10', '-A', 'proj') is self.t.ok.return_value        self.t.ok.assert_called_with(            '\n'.join(' '.join(node.argv_with_executable) for node in nodes))    def test_kill(self):        self.t.kill('10', '-A', 'proj')        self.t.splash.assert_called_with()        self.t.cluster_from_argv.assert_called_with(('10', '-A', 'proj'))        self.cluster.kill.assert_called_with()    def test_expand(self):        node1 = Mock(name='n1')        node2 = Mock(name='n2')        node1.expander.return_value = 'A'        node2.expander.return_value = 'B'        nodes = self.t.cluster_from_argv.return_value = [node1, node2]        assert self.t.expand('%p', '10') is self.t.ok.return_value        self.t.cluster_from_argv.assert_called_with(('10',))        for node in nodes:            node.expander.assert_called_with('%p')        self.t.ok.assert_called_with('A\nB')    def test_note(self):        self.t.quiet = True        self.t.note('foo')        self.t.say.assert_not_called()        self.t.quiet = False        self.t.note('foo')        self.t.say.assert_called_with('foo', newline=True)    def test_splash(self):        x = MultiTool()        x.note = Mock()        x.nosplash = True        x.splash()        x.note.assert_not_called()        x.nosplash = False        x.splash()        x.note.assert_called()    def test_Cluster(self):        m = MultiTool()        c = m.cluster_from_argv(['A', 'B', 'C'])        assert c.env is m.env        assert c.cmd == 'celery worker'        assert c.on_stopping_preamble == m.on_stopping_preamble        assert c.on_send_signal == m.on_send_signal        assert c.on_still_waiting_for == m.on_still_waiting_for        assert c.on_still_waiting_progress == m.on_still_waiting_progress        assert c.on_still_waiting_end == m.on_still_waiting_end        assert c.on_node_start == m.on_node_start        assert c.on_node_restart == m.on_node_restart        assert c.on_node_shutdown_ok == m.on_node_shutdown_ok        assert c.on_node_status == m.on_node_status        assert c.on_node_signal_dead == m.on_node_signal_dead        assert c.on_node_signal == m.on_node_signal        assert c.on_node_down == m.on_node_down        assert c.on_child_spawn == m.on_child_spawn        assert c.on_child_signalled == m.on_child_signalled        assert c.on_child_failure == m.on_child_failure    def test_on_stopping_preamble(self):        self.t.on_stopping_preamble([])    def test_on_send_signal(self):        self.t.on_send_signal(Mock(), Mock())    def test_on_still_waiting_for(self):        self.t.on_still_waiting_for([Mock(), Mock()])    def test_on_still_waiting_for__empty(self):        self.t.on_still_waiting_for([])    def test_on_still_waiting_progress(self):        self.t.on_still_waiting_progress([])    def test_on_still_waiting_end(self):        self.t.on_still_waiting_end()    def test_on_node_signal_dead(self):        self.t.on_node_signal_dead(Mock())    def test_on_node_start(self):        self.t.on_node_start(Mock())    def test_on_node_restart(self):        self.t.on_node_restart(Mock())    def test_on_node_down(self):        self.t.on_node_down(Mock())    def test_on_node_shutdown_ok(self):        self.t.on_node_shutdown_ok(Mock())    def test_on_node_status__FAIL(self):        self.t.on_node_status(Mock(), 1)        self.t.say.assert_called_with(self.t.FAILED, newline=True)    def test_on_node_status__OK(self):        self.t.on_node_status(Mock(), 0)        self.t.say.assert_called_with(self.t.OK, newline=True)    def test_on_node_signal(self):        self.t.on_node_signal(Mock(), Mock())    def test_on_child_spawn(self):        self.t.on_child_spawn(Mock(), Mock(), Mock())    def test_on_child_signalled(self):        self.t.on_child_signalled(Mock(), Mock())    def test_on_child_failure(self):        self.t.on_child_failure(Mock(), Mock())    def test_constant_strings(self):        assert self.t.OK        assert self.t.DOWN        assert self.t.FAILEDclass test_MultiTool_functional:    def setup(self):        self.fh = WhateverIO()        self.env = {}        self.t = MultiTool(env=self.env, fh=self.fh)    def test_note(self):        self.t.note('hello world')        assert self.fh.getvalue() == 'hello world\n'    def test_note_quiet(self):        self.t.quiet = True        self.t.note('hello world')        assert not self.fh.getvalue()    def test_carp(self):        self.t.say = Mock()        self.t.carp('foo')        self.t.say.assert_called_with('foo', True, self.t.stderr)    def test_info(self):        self.t.verbose = True        self.t.info('hello info')        assert self.fh.getvalue() == 'hello info\n'    def test_info_not_verbose(self):        self.t.verbose = False        self.t.info('hello info')        assert not self.fh.getvalue()    def test_error(self):        self.t.carp = Mock()        self.t.usage = Mock()        assert self.t.error('foo') == 1        self.t.carp.assert_called_with('foo')        self.t.usage.assert_called_with()        self.t.carp = Mock()        assert self.t.error() == 1        self.t.carp.assert_not_called()    def test_nosplash(self):        self.t.nosplash = True        self.t.splash()        assert not self.fh.getvalue()    def test_splash(self):        self.t.nosplash = False        self.t.splash()        assert 'celery multi' in self.fh.getvalue()    def test_usage(self):        self.t.usage()        assert self.fh.getvalue()    def test_help(self):        self.t.help([])        assert doc in self.fh.getvalue()    def test_expand(self):        self.t.expand('foo%n', 'ask', 'klask', 'dask')        assert self.fh.getvalue() == 'fooask\nfooklask\nfoodask\n'    @patch('celery.apps.multi.gethostname')    def test_get(self, gethostname):        gethostname.return_value = 'e.com'        self.t.get('xuzzy@e.com', 'foo', 'bar', 'baz')        assert not self.fh.getvalue()        self.t.get('foo@e.com', 'foo', 'bar', 'baz')        assert self.fh.getvalue()    @patch('celery.apps.multi.gethostname')    def test_names(self, gethostname):        gethostname.return_value = 'e.com'        self.t.names('foo', 'bar', 'baz')        assert 'foo@e.com\nbar@e.com\nbaz@e.com' in self.fh.getvalue()    def test_execute_from_commandline(self):        start = self.t.commands['start'] = Mock()        self.t.error = Mock()        self.t.execute_from_commandline(['multi', 'start', 'foo', 'bar'])        self.t.error.assert_not_called()        start.assert_called_with('foo', 'bar')        self.t.error = Mock()        self.t.execute_from_commandline(['multi', 'frob', 'foo', 'bar'])        self.t.error.assert_called_with('Invalid command: frob')        self.t.error = Mock()        self.t.execute_from_commandline(['multi'])        self.t.error.assert_called_with()        self.t.error = Mock()        self.t.execute_from_commandline(['multi', '-foo'])        self.t.error.assert_called_with()        self.t.execute_from_commandline(            ['multi', 'start', 'foo',             '--nosplash', '--quiet', '-q', '--verbose', '--no-color'],        )        assert self.t.nosplash        assert self.t.quiet        assert self.t.verbose        assert self.t.no_color    @patch('celery.bin.multi.MultiTool')    def test_main(self, MultiTool):        m = MultiTool.return_value = Mock()        with pytest.raises(SystemExit):            main()        m.execute_from_commandline.assert_called_with(sys.argv)
 |