| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 | 
							- from __future__ import absolute_import, unicode_literals
 
- import errno
 
- import pytest
 
- import signal
 
- import sys
 
- from case import Mock, call, patch, skip
 
- from celery.apps.multi import (
 
-     Cluster, MultiParser, NamespacedOptionParser, Node, format_opt,
 
- )
 
- class test_functions:
 
-     def test_parse_ns_range(self):
 
-         m = MultiParser()
 
-         assert m._parse_ns_range('1-3', True), ['1', '2' == '3']
 
-         assert m._parse_ns_range('1-3', False) == ['1-3']
 
-         assert m._parse_ns_range('1-3,10,11,20', True) == [
 
-             '1', '2', '3', '10', '11', '20',
 
-         ]
 
-     def test_format_opt(self):
 
-         assert format_opt('--foo', None) == '--foo'
 
-         assert format_opt('-c', 1) == '-c 1'
 
-         assert format_opt('--log', 'foo') == '--log=foo'
 
- class test_NamespacedOptionParser:
 
-     def test_parse(self):
 
-         x = NamespacedOptionParser(['-c:1,3', '4'])
 
-         x.parse()
 
-         assert x.namespaces.get('1,3') == {'-c': '4'}
 
-         x = NamespacedOptionParser(['-c:jerry,elaine', '5',
 
-                                     '--loglevel:kramer=DEBUG',
 
-                                     '--flag',
 
-                                     '--logfile=foo', '-Q', 'bar', 'a', 'b',
 
-                                     '--', '.disable_rate_limits=1'])
 
-         x.parse()
 
-         assert x.options == {
 
-             '--logfile': 'foo',
 
-             '-Q': 'bar',
 
-             '--flag': None,
 
-         }
 
-         assert x.values, ['a' == 'b']
 
-         assert x.namespaces.get('jerry,elaine') == {'-c': '5'}
 
-         assert x.namespaces.get('kramer') == {'--loglevel': 'DEBUG'}
 
-         assert x.passthrough == '-- .disable_rate_limits=1'
 
- def multi_args(p, *args, **kwargs):
 
-     return MultiParser(*args, **kwargs).parse(p)
 
- class test_multi_args:
 
-     @patch('celery.apps.multi.gethostname')
 
-     def test_parse(self, gethostname):
 
-         gethostname.return_value = 'example.com'
 
-         p = NamespacedOptionParser([
 
-             '-c:jerry,elaine', '5',
 
-             '--loglevel:kramer=DEBUG',
 
-             '--flag',
 
-             '--logfile=foo', '-Q', 'bar', 'jerry',
 
-             'elaine', 'kramer',
 
-             '--', '.disable_rate_limits=1',
 
-         ])
 
-         p.parse()
 
-         it = multi_args(p, cmd='COMMAND', append='*AP*',
 
-                         prefix='*P*', suffix='*S*')
 
-         nodes = list(it)
 
-         def assert_line_in(name, args):
 
-             assert name in {n.name for n in nodes}
 
-             argv = None
 
-             for node in nodes:
 
-                 if node.name == name:
 
-                     argv = node.argv
 
-             assert argv
 
-             for arg in args:
 
-                 assert arg in argv
 
-         assert_line_in(
 
-             '*P*jerry@*S*',
 
-             ['COMMAND', '-n *P*jerry@*S*', '-Q bar',
 
-              '-c 5', '--flag', '--logfile=foo',
 
-              '-- .disable_rate_limits=1', '*AP*'],
 
-         )
 
-         assert_line_in(
 
-             '*P*elaine@*S*',
 
-             ['COMMAND', '-n *P*elaine@*S*', '-Q bar',
 
-              '-c 5', '--flag', '--logfile=foo',
 
-              '-- .disable_rate_limits=1', '*AP*'],
 
-         )
 
-         assert_line_in(
 
-             '*P*kramer@*S*',
 
-             ['COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*',
 
-              '-Q bar', '--flag', '--logfile=foo',
 
-              '-- .disable_rate_limits=1', '*AP*'],
 
-         )
 
-         expand = nodes[0].expander
 
-         assert expand('%h') == '*P*jerry@*S*'
 
-         assert expand('%n') == '*P*jerry'
 
-         nodes2 = list(multi_args(p, cmd='COMMAND', append='',
 
-                       prefix='*P*', suffix='*S*'))
 
-         assert nodes2[0].argv[-1] == '-- .disable_rate_limits=1'
 
-         p2 = NamespacedOptionParser(['10', '-c:1', '5'])
 
-         p2.parse()
 
-         nodes3 = list(multi_args(p2, cmd='COMMAND'))
 
-         def _args(name, *args):
 
-             return args + (
 
-                 '--pidfile={0}.pid'.format(name),
 
-                 '--logfile={0}%I.log'.format(name),
 
-                 '--executable={0}'.format(sys.executable),
 
-                 '',
 
-             )
 
-         assert len(nodes3) == 10
 
-         assert nodes3[0].name == 'celery1@example.com'
 
-         assert nodes3[0].argv == (
 
-             'COMMAND', '-c 5', '-n celery1@example.com') + _args('celery1')
 
-         for i, worker in enumerate(nodes3[1:]):
 
-             assert worker.name == 'celery%s@example.com' % (i + 2)
 
-             node_i = 'celery%s' % (i + 2,)
 
-             assert worker.argv == (
 
-                 'COMMAND',
 
-                 '-n %s@example.com' % (node_i,)) + _args(node_i)
 
-         nodes4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))
 
-         assert len(nodes4) == 10
 
-         assert nodes4[0].name == 'celery1@'
 
-         assert nodes4[0].argv == (
 
-             'COMMAND', '-c 5', '-n celery1@') + _args('celery1')
 
-         p3 = NamespacedOptionParser(['foo@', '-c:foo', '5'])
 
-         p3.parse()
 
-         nodes5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))
 
-         assert nodes5[0].name == 'foo@'
 
-         assert nodes5[0].argv == (
 
-             'COMMAND', '-c 5', '-n foo@') + _args('foo')
 
-         p4 = NamespacedOptionParser(['foo', '-Q:1', 'test'])
 
-         p4.parse()
 
-         nodes6 = list(multi_args(p4, cmd='COMMAND', suffix='""'))
 
-         assert nodes6[0].name == 'foo@'
 
-         assert nodes6[0].argv == (
 
-             'COMMAND', '-Q test', '-n foo@') + _args('foo')
 
-         p5 = NamespacedOptionParser(['foo@bar', '-Q:1', 'test'])
 
-         p5.parse()
 
-         nodes7 = list(multi_args(p5, cmd='COMMAND', suffix='""'))
 
-         assert nodes7[0].name == 'foo@bar'
 
-         assert nodes7[0].argv == (
 
-             'COMMAND', '-Q test', '-n foo@bar') + _args('foo')
 
-         p6 = NamespacedOptionParser(['foo@bar', '-Q:0', 'test'])
 
-         p6.parse()
 
-         with pytest.raises(KeyError):
 
-             list(multi_args(p6))
 
-     def test_optmerge(self):
 
-         p = NamespacedOptionParser(['foo', 'test'])
 
-         p.parse()
 
-         p.options = {'x': 'y'}
 
-         r = p.optmerge('foo')
 
-         assert r['x'] == 'y'
 
- class test_Node:
 
-     def setup(self):
 
-         self.p = Mock(name='p')
 
-         self.p.options = {
 
-             '--executable': 'python',
 
-             '--logfile': 'foo.log',
 
-         }
 
-         self.p.namespaces = {}
 
-         self.node = Node('foo@bar.com', options={'-A': 'proj'})
 
-         self.expander = self.node.expander = Mock(name='expander')
 
-         self.node.pid = 303
 
-     def test_from_kwargs(self):
 
-         n = Node.from_kwargs(
 
-             'foo@bar.com',
 
-             max_tasks_per_child=30, A='foo', Q='q1,q2', O='fair',
 
-         )
 
-         assert sorted(n.argv) == sorted([
 
-             '-m celery worker --detach',
 
-             '-A foo',
 
-             '--executable={0}'.format(n.executable),
 
-             '-O fair',
 
-             '-n foo@bar.com',
 
-             '--logfile=foo%I.log',
 
-             '-Q q1,q2',
 
-             '--max-tasks-per-child=30',
 
-             '--pidfile=foo.pid',
 
-             '',
 
-         ])
 
-     @patch('os.kill')
 
-     def test_send(self, kill):
 
-         assert self.node.send(9)
 
-         kill.assert_called_with(self.node.pid, 9)
 
-     @patch('os.kill')
 
-     def test_send__ESRCH(self, kill):
 
-         kill.side_effect = OSError()
 
-         kill.side_effect.errno = errno.ESRCH
 
-         assert not self.node.send(9)
 
-         kill.assert_called_with(self.node.pid, 9)
 
-     @patch('os.kill')
 
-     def test_send__error(self, kill):
 
-         kill.side_effect = OSError()
 
-         kill.side_effect.errno = errno.ENOENT
 
-         with pytest.raises(OSError):
 
-             self.node.send(9)
 
-         kill.assert_called_with(self.node.pid, 9)
 
-     def test_alive(self):
 
-         self.node.send = Mock(name='send')
 
-         assert self.node.alive() is self.node.send.return_value
 
-         self.node.send.assert_called_with(0)
 
-     def test_start(self):
 
-         self.node._waitexec = Mock(name='_waitexec')
 
-         self.node.start(env={'foo': 'bar'}, kw=2)
 
-         self.node._waitexec.assert_called_with(
 
-             self.node.argv, path=self.node.executable,
 
-             env={'foo': 'bar'}, kw=2,
 
-         )
 
-     @patch('celery.apps.multi.Popen')
 
-     def test_waitexec(self, Popen, argv=['A', 'B']):
 
-         on_spawn = Mock(name='on_spawn')
 
-         on_signalled = Mock(name='on_signalled')
 
-         on_failure = Mock(name='on_failure')
 
-         env = Mock(name='env')
 
-         self.node.handle_process_exit = Mock(name='handle_process_exit')
 
-         self.node._waitexec(
 
-             argv,
 
-             path='python',
 
-             env=env,
 
-             on_spawn=on_spawn,
 
-             on_signalled=on_signalled,
 
-             on_failure=on_failure,
 
-         )
 
-         Popen.assert_called_with(
 
-             self.node.prepare_argv(argv, 'python'), env=env)
 
-         self.node.handle_process_exit.assert_called_with(
 
-             Popen().wait(),
 
-             on_signalled=on_signalled,
 
-             on_failure=on_failure,
 
-         )
 
-     def test_handle_process_exit(self):
 
-         assert self.node.handle_process_exit(0) == 0
 
-     def test_handle_process_exit__failure(self):
 
-         on_failure = Mock(name='on_failure')
 
-         assert self.node.handle_process_exit(9, on_failure=on_failure) == 9
 
-         on_failure.assert_called_with(self.node, 9)
 
-     def test_handle_process_exit__signalled(self):
 
-         on_signalled = Mock(name='on_signalled')
 
-         assert self.node.handle_process_exit(
 
-             -9, on_signalled=on_signalled) == 9
 
-         on_signalled.assert_called_with(self.node, 9)
 
-     def test_logfile(self):
 
-         assert self.node.logfile == self.expander.return_value
 
-         self.expander.assert_called_with('%n%I.log')
 
- class test_Cluster:
 
-     def setup(self):
 
-         self.Popen = self.patching('celery.apps.multi.Popen')
 
-         self.kill = self.patching('os.kill')
 
-         self.gethostname = self.patching('celery.apps.multi.gethostname')
 
-         self.gethostname.return_value = 'example.com'
 
-         self.Pidfile = self.patching('celery.apps.multi.Pidfile')
 
-         self.cluster = Cluster(
 
-             [Node('foo@example.com'),
 
-              Node('bar@example.com'),
 
-              Node('baz@example.com')],
 
-             on_stopping_preamble=Mock(name='on_stopping_preamble'),
 
-             on_send_signal=Mock(name='on_send_signal'),
 
-             on_still_waiting_for=Mock(name='on_still_waiting_for'),
 
-             on_still_waiting_progress=Mock(name='on_still_waiting_progress'),
 
-             on_still_waiting_end=Mock(name='on_still_waiting_end'),
 
-             on_node_start=Mock(name='on_node_start'),
 
-             on_node_restart=Mock(name='on_node_restart'),
 
-             on_node_shutdown_ok=Mock(name='on_node_shutdown_ok'),
 
-             on_node_status=Mock(name='on_node_status'),
 
-             on_node_signal=Mock(name='on_node_signal'),
 
-             on_node_signal_dead=Mock(name='on_node_signal_dead'),
 
-             on_node_down=Mock(name='on_node_down'),
 
-             on_child_spawn=Mock(name='on_child_spawn'),
 
-             on_child_signalled=Mock(name='on_child_signalled'),
 
-             on_child_failure=Mock(name='on_child_failure'),
 
-         )
 
-     def test_len(self):
 
-         assert len(self.cluster) == 3
 
-     def test_getitem(self):
 
-         assert self.cluster[0].name == 'foo@example.com'
 
-     def test_start(self):
 
-         self.cluster.start_node = Mock(name='start_node')
 
-         self.cluster.start()
 
-         self.cluster.start_node.assert_has_calls(
 
-             call(node) for node in self.cluster
 
-         )
 
-     def test_start_node(self):
 
-         self.cluster._start_node = Mock(name='_start_node')
 
-         node = self.cluster[0]
 
-         assert (self.cluster.start_node(node) is
 
-                 self.cluster._start_node.return_value)
 
-         self.cluster.on_node_start.assert_called_with(node)
 
-         self.cluster._start_node.assert_called_with(node)
 
-         self.cluster.on_node_status.assert_called_with(
 
-             node, self.cluster._start_node(),
 
-         )
 
-     def test__start_node(self):
 
-         node = self.cluster[0]
 
-         node.start = Mock(name='node.start')
 
-         assert self.cluster._start_node(node) is node.start.return_value
 
-         node.start.assert_called_with(
 
-             self.cluster.env,
 
-             on_spawn=self.cluster.on_child_spawn,
 
-             on_signalled=self.cluster.on_child_signalled,
 
-             on_failure=self.cluster.on_child_failure,
 
-         )
 
-     def test_send_all(self):
 
-         nodes = [Mock(name='n1'), Mock(name='n2')]
 
-         self.cluster.getpids = Mock(name='getpids')
 
-         self.cluster.getpids.return_value = nodes
 
-         self.cluster.send_all(15)
 
-         self.cluster.on_node_signal.assert_has_calls(
 
-             call(node, 'TERM') for node in nodes
 
-         )
 
-         for node in nodes:
 
-             node.send.assert_called_with(15, self.cluster.on_node_signal_dead)
 
-     @skip.if_win32()
 
-     def test_kill(self):
 
-         self.cluster.send_all = Mock(name='.send_all')
 
-         self.cluster.kill()
 
-         self.cluster.send_all.assert_called_with(signal.SIGKILL)
 
-     def test_getpids(self):
 
-         self.gethostname.return_value = 'e.com'
 
-         self.prepare_pidfile_for_getpids(self.Pidfile)
 
-         callback = Mock()
 
-         p = Cluster([
 
-             Node('foo@e.com'),
 
-             Node('bar@e.com'),
 
-             Node('baz@e.com'),
 
-         ])
 
-         nodes = p.getpids(on_down=callback)
 
-         node_0, node_1 = nodes
 
-         assert node_0.name == 'foo@e.com'
 
-         assert sorted(node_0.argv) == sorted([
 
-             '',
 
-             '--executable={0}'.format(node_0.executable),
 
-             '--logfile=foo%I.log',
 
-             '--pidfile=foo.pid',
 
-             '-m celery worker --detach',
 
-             '-n foo@e.com',
 
-         ])
 
-         assert node_0.pid == 10
 
-         assert node_1.name == 'bar@e.com'
 
-         assert sorted(node_1.argv) == sorted([
 
-             '',
 
-             '--executable={0}'.format(node_1.executable),
 
-             '--logfile=bar%I.log',
 
-             '--pidfile=bar.pid',
 
-             '-m celery worker --detach',
 
-             '-n bar@e.com',
 
-         ])
 
-         assert node_1.pid == 11
 
-         # without callback, should work
 
-         nodes = p.getpids('celery worker')
 
-     def prepare_pidfile_for_getpids(self, Pidfile):
 
-         class pids(object):
 
-             def __init__(self, path):
 
-                 self.path = path
 
-             def read_pid(self):
 
-                 try:
 
-                     return {'foo.pid': 10,
 
-                             'bar.pid': 11}[self.path]
 
-                 except KeyError:
 
-                     raise ValueError()
 
-         self.Pidfile.side_effect = pids
 
 
  |