| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 | from __future__ import absolute_import, unicode_literalsimport errnoimport pytestimport signalimport sysfrom case import Mock, call, patch, skipfrom celery.apps.multi import (    Cluster, MultiParser, NamespacedOptionParser, Node, format_opt,)class test_functions:    def test_parse_ns_range(self):        m = MultiParser()        assert m._parse_ns_range('1-3', True), ['1', '2' == '3']        assert m._parse_ns_range('1-3', False) == ['1-3']        assert m._parse_ns_range('1-3,10,11,20', True) == [            '1', '2', '3', '10', '11', '20',        ]    def test_format_opt(self):        assert format_opt('--foo', None) == '--foo'        assert format_opt('-c', 1) == '-c 1'        assert format_opt('--log', 'foo') == '--log=foo'class test_NamespacedOptionParser:    def test_parse(self):        x = NamespacedOptionParser(['-c:1,3', '4'])        x.parse()        assert x.namespaces.get('1,3') == {'-c': '4'}        x = NamespacedOptionParser(['-c:jerry,elaine', '5',                                    '--loglevel:kramer=DEBUG',                                    '--flag',                                    '--logfile=foo', '-Q', 'bar', 'a', 'b',                                    '--', '.disable_rate_limits=1'])        x.parse()        assert x.options == {            '--logfile': 'foo',            '-Q': 'bar',            '--flag': None,        }        assert x.values, ['a' == 'b']        assert x.namespaces.get('jerry,elaine') == {'-c': '5'}        assert x.namespaces.get('kramer') == {'--loglevel': 'DEBUG'}        assert x.passthrough == '-- .disable_rate_limits=1'def multi_args(p, *args, **kwargs):    return MultiParser(*args, **kwargs).parse(p)class test_multi_args:    @patch('celery.apps.multi.gethostname')    def test_parse(self, gethostname):        gethostname.return_value = 'example.com'        p = NamespacedOptionParser([            '-c:jerry,elaine', '5',            '--loglevel:kramer=DEBUG',            '--flag',            '--logfile=foo', '-Q', 'bar', 'jerry',            'elaine', 'kramer',            '--', '.disable_rate_limits=1',        ])        p.parse()        it = multi_args(p, cmd='COMMAND', append='*AP*',                        prefix='*P*', suffix='*S*')        nodes = list(it)        def assert_line_in(name, args):            assert name in {n.name for n in nodes}            argv = None            for node in nodes:                if node.name == name:                    argv = node.argv            assert argv            for arg in args:                assert arg in argv        assert_line_in(            '*P*jerry@*S*',            ['COMMAND', '-n *P*jerry@*S*', '-Q bar',             '-c 5', '--flag', '--logfile=foo',             '-- .disable_rate_limits=1', '*AP*'],        )        assert_line_in(            '*P*elaine@*S*',            ['COMMAND', '-n *P*elaine@*S*', '-Q bar',             '-c 5', '--flag', '--logfile=foo',             '-- .disable_rate_limits=1', '*AP*'],        )        assert_line_in(            '*P*kramer@*S*',            ['COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*',             '-Q bar', '--flag', '--logfile=foo',             '-- .disable_rate_limits=1', '*AP*'],        )        expand = nodes[0].expander        assert expand('%h') == '*P*jerry@*S*'        assert expand('%n') == '*P*jerry'        nodes2 = list(multi_args(p, cmd='COMMAND', append='',                      prefix='*P*', suffix='*S*'))        assert nodes2[0].argv[-1] == '-- .disable_rate_limits=1'        p2 = NamespacedOptionParser(['10', '-c:1', '5'])        p2.parse()        nodes3 = list(multi_args(p2, cmd='COMMAND'))        def _args(name, *args):            return args + (                '--pidfile={0}.pid'.format(name),                '--logfile={0}%I.log'.format(name),                '--executable={0}'.format(sys.executable),                '',            )        assert len(nodes3) == 10        assert nodes3[0].name == 'celery1@example.com'        assert nodes3[0].argv == (            'COMMAND', '-c 5', '-n celery1@example.com') + _args('celery1')        for i, worker in enumerate(nodes3[1:]):            assert worker.name == 'celery%s@example.com' % (i + 2)            node_i = 'celery%s' % (i + 2,)            assert worker.argv == (                'COMMAND',                '-n %s@example.com' % (node_i,)) + _args(node_i)        nodes4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))        assert len(nodes4) == 10        assert nodes4[0].name == 'celery1@'        assert nodes4[0].argv == (            'COMMAND', '-c 5', '-n celery1@') + _args('celery1')        p3 = NamespacedOptionParser(['foo@', '-c:foo', '5'])        p3.parse()        nodes5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))        assert nodes5[0].name == 'foo@'        assert nodes5[0].argv == (            'COMMAND', '-c 5', '-n foo@') + _args('foo')        p4 = NamespacedOptionParser(['foo', '-Q:1', 'test'])        p4.parse()        nodes6 = list(multi_args(p4, cmd='COMMAND', suffix='""'))        assert nodes6[0].name == 'foo@'        assert nodes6[0].argv == (            'COMMAND', '-Q test', '-n foo@') + _args('foo')        p5 = NamespacedOptionParser(['foo@bar', '-Q:1', 'test'])        p5.parse()        nodes7 = list(multi_args(p5, cmd='COMMAND', suffix='""'))        assert nodes7[0].name == 'foo@bar'        assert nodes7[0].argv == (            'COMMAND', '-Q test', '-n foo@bar') + _args('foo')        p6 = NamespacedOptionParser(['foo@bar', '-Q:0', 'test'])        p6.parse()        with pytest.raises(KeyError):            list(multi_args(p6))    def test_optmerge(self):        p = NamespacedOptionParser(['foo', 'test'])        p.parse()        p.options = {'x': 'y'}        r = p.optmerge('foo')        assert r['x'] == 'y'class test_Node:    def setup(self):        self.p = Mock(name='p')        self.p.options = {            '--executable': 'python',            '--logfile': 'foo.log',        }        self.p.namespaces = {}        self.node = Node('foo@bar.com', options={'-A': 'proj'})        self.expander = self.node.expander = Mock(name='expander')        self.node.pid = 303    def test_from_kwargs(self):        n = Node.from_kwargs(            'foo@bar.com',            max_tasks_per_child=30, A='foo', Q='q1,q2', O='fair',        )        assert sorted(n.argv) == sorted([            '-m celery worker --detach',            '-A foo',            '--executable={0}'.format(n.executable),            '-O fair',            '-n foo@bar.com',            '--logfile=foo%I.log',            '-Q q1,q2',            '--max-tasks-per-child=30',            '--pidfile=foo.pid',            '',        ])    @patch('os.kill')    def test_send(self, kill):        assert self.node.send(9)        kill.assert_called_with(self.node.pid, 9)    @patch('os.kill')    def test_send__ESRCH(self, kill):        kill.side_effect = OSError()        kill.side_effect.errno = errno.ESRCH        assert not self.node.send(9)        kill.assert_called_with(self.node.pid, 9)    @patch('os.kill')    def test_send__error(self, kill):        kill.side_effect = OSError()        kill.side_effect.errno = errno.ENOENT        with pytest.raises(OSError):            self.node.send(9)        kill.assert_called_with(self.node.pid, 9)    def test_alive(self):        self.node.send = Mock(name='send')        assert self.node.alive() is self.node.send.return_value        self.node.send.assert_called_with(0)    def test_start(self):        self.node._waitexec = Mock(name='_waitexec')        self.node.start(env={'foo': 'bar'}, kw=2)        self.node._waitexec.assert_called_with(            self.node.argv, path=self.node.executable,            env={'foo': 'bar'}, kw=2,        )    @patch('celery.apps.multi.Popen')    def test_waitexec(self, Popen, argv=['A', 'B']):        on_spawn = Mock(name='on_spawn')        on_signalled = Mock(name='on_signalled')        on_failure = Mock(name='on_failure')        env = Mock(name='env')        self.node.handle_process_exit = Mock(name='handle_process_exit')        self.node._waitexec(            argv,            path='python',            env=env,            on_spawn=on_spawn,            on_signalled=on_signalled,            on_failure=on_failure,        )        Popen.assert_called_with(            self.node.prepare_argv(argv, 'python'), env=env)        self.node.handle_process_exit.assert_called_with(            Popen().wait(),            on_signalled=on_signalled,            on_failure=on_failure,        )    def test_handle_process_exit(self):        assert self.node.handle_process_exit(0) == 0    def test_handle_process_exit__failure(self):        on_failure = Mock(name='on_failure')        assert self.node.handle_process_exit(9, on_failure=on_failure) == 9        on_failure.assert_called_with(self.node, 9)    def test_handle_process_exit__signalled(self):        on_signalled = Mock(name='on_signalled')        assert self.node.handle_process_exit(            -9, on_signalled=on_signalled) == 9        on_signalled.assert_called_with(self.node, 9)    def test_logfile(self):        assert self.node.logfile == self.expander.return_value        self.expander.assert_called_with('%n%I.log')class test_Cluster:    def setup(self):        self.Popen = self.patching('celery.apps.multi.Popen')        self.kill = self.patching('os.kill')        self.gethostname = self.patching('celery.apps.multi.gethostname')        self.gethostname.return_value = 'example.com'        self.Pidfile = self.patching('celery.apps.multi.Pidfile')        self.cluster = Cluster(            [Node('foo@example.com'),             Node('bar@example.com'),             Node('baz@example.com')],            on_stopping_preamble=Mock(name='on_stopping_preamble'),            on_send_signal=Mock(name='on_send_signal'),            on_still_waiting_for=Mock(name='on_still_waiting_for'),            on_still_waiting_progress=Mock(name='on_still_waiting_progress'),            on_still_waiting_end=Mock(name='on_still_waiting_end'),            on_node_start=Mock(name='on_node_start'),            on_node_restart=Mock(name='on_node_restart'),            on_node_shutdown_ok=Mock(name='on_node_shutdown_ok'),            on_node_status=Mock(name='on_node_status'),            on_node_signal=Mock(name='on_node_signal'),            on_node_signal_dead=Mock(name='on_node_signal_dead'),            on_node_down=Mock(name='on_node_down'),            on_child_spawn=Mock(name='on_child_spawn'),            on_child_signalled=Mock(name='on_child_signalled'),            on_child_failure=Mock(name='on_child_failure'),        )    def test_len(self):        assert len(self.cluster) == 3    def test_getitem(self):        assert self.cluster[0].name == 'foo@example.com'    def test_start(self):        self.cluster.start_node = Mock(name='start_node')        self.cluster.start()        self.cluster.start_node.assert_has_calls(            call(node) for node in self.cluster        )    def test_start_node(self):        self.cluster._start_node = Mock(name='_start_node')        node = self.cluster[0]        assert (self.cluster.start_node(node) is                self.cluster._start_node.return_value)        self.cluster.on_node_start.assert_called_with(node)        self.cluster._start_node.assert_called_with(node)        self.cluster.on_node_status.assert_called_with(            node, self.cluster._start_node(),        )    def test__start_node(self):        node = self.cluster[0]        node.start = Mock(name='node.start')        assert self.cluster._start_node(node) is node.start.return_value        node.start.assert_called_with(            self.cluster.env,            on_spawn=self.cluster.on_child_spawn,            on_signalled=self.cluster.on_child_signalled,            on_failure=self.cluster.on_child_failure,        )    def test_send_all(self):        nodes = [Mock(name='n1'), Mock(name='n2')]        self.cluster.getpids = Mock(name='getpids')        self.cluster.getpids.return_value = nodes        self.cluster.send_all(15)        self.cluster.on_node_signal.assert_has_calls(            call(node, 'TERM') for node in nodes        )        for node in nodes:            node.send.assert_called_with(15, self.cluster.on_node_signal_dead)    @skip.if_win32()    def test_kill(self):        self.cluster.send_all = Mock(name='.send_all')        self.cluster.kill()        self.cluster.send_all.assert_called_with(signal.SIGKILL)    def test_getpids(self):        self.gethostname.return_value = 'e.com'        self.prepare_pidfile_for_getpids(self.Pidfile)        callback = Mock()        p = Cluster([            Node('foo@e.com'),            Node('bar@e.com'),            Node('baz@e.com'),        ])        nodes = p.getpids(on_down=callback)        node_0, node_1 = nodes        assert node_0.name == 'foo@e.com'        assert sorted(node_0.argv) == sorted([            '',            '--executable={0}'.format(node_0.executable),            '--logfile=foo%I.log',            '--pidfile=foo.pid',            '-m celery worker --detach',            '-n foo@e.com',        ])        assert node_0.pid == 10        assert node_1.name == 'bar@e.com'        assert sorted(node_1.argv) == sorted([            '',            '--executable={0}'.format(node_1.executable),            '--logfile=bar%I.log',            '--pidfile=bar.pid',            '-m celery worker --detach',            '-n bar@e.com',        ])        assert node_1.pid == 11        # without callback, should work        nodes = p.getpids('celery worker')    def prepare_pidfile_for_getpids(self, Pidfile):        class pids(object):            def __init__(self, path):                self.path = path            def read_pid(self):                try:                    return {'foo.pid': 10,                            'bar.pid': 11}[self.path]                except KeyError:                    raise ValueError()        self.Pidfile.side_effect = pids
 |