123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- from __future__ import absolute_import
- import errno
- import signal
- import sys
- from mock import Mock, patch
- from celery.bin.multi import (
- main,
- MultiTool,
- findsig,
- abbreviations,
- parse_ns_range,
- format_opt,
- quote,
- NamespacedOptionParser,
- multi_args,
- __doc__ as doc,
- )
- from celery.tests.utils import Case, WhateverIO
- class test_functions(Case):
- def test_findsig(self):
- self.assertEqual(findsig(['a', 'b', 'c', '-1']), 1)
- self.assertEqual(findsig(['--foo=1', '-9']), 9)
- self.assertEqual(findsig(['-INT']), signal.SIGINT)
- self.assertEqual(findsig([]), signal.SIGTERM)
- self.assertEqual(findsig(['-s']), signal.SIGTERM)
- self.assertEqual(findsig(['-log']), signal.SIGTERM)
- def test_abbreviations(self):
- expander = abbreviations({'%s': 'START',
- '%x': 'STOP'})
- self.assertEqual(expander('foo%s'), 'fooSTART')
- self.assertEqual(expander('foo%x'), 'fooSTOP')
- self.assertEqual(expander('foo%y'), 'foo%y')
- self.assertIsNone(expander(None))
- def test_parse_ns_range(self):
- self.assertEqual(parse_ns_range('1-3', True), ['1', '2', '3'])
- self.assertEqual(parse_ns_range('1-3', False), ['1-3'])
- self.assertEqual(parse_ns_range('1-3,10,11,20', True),
- ['1', '2', '3', '10', '11', '20'])
- def test_format_opt(self):
- self.assertEqual(format_opt('--foo', None), '--foo')
- self.assertEqual(format_opt('-c', 1), '-c 1')
- self.assertEqual(format_opt('--log', 'foo'), '--log=foo')
- def test_quote(self):
- self.assertEqual(quote("the 'quick"), "'the '\\''quick'")
- class test_NamespacedOptionParser(Case):
- def test_parse(self):
- x = NamespacedOptionParser(['-c:1,3', '4'])
- self.assertEqual(x.namespaces.get('1,3'), {'-c': '4'})
- x = NamespacedOptionParser(['-c:jerry,elaine', '5',
- '--loglevel:kramer=DEBUG',
- '--flag',
- '--logfile=foo', '-Q', 'bar', 'a', 'b',
- '--', '.disable_rate_limits=1'])
- self.assertEqual(x.options, {'--logfile': 'foo',
- '-Q': 'bar',
- '--flag': None})
- self.assertEqual(x.values, ['a', 'b'])
- self.assertEqual(x.namespaces.get('jerry,elaine'), {'-c': '5'})
- self.assertEqual(x.namespaces.get('kramer'), {'--loglevel': 'DEBUG'})
- self.assertEqual(x.passthrough, '-- .disable_rate_limits=1')
- class test_multi_args(Case):
- @patch('socket.gethostname')
- def test_parse(self, gethostname):
- p = NamespacedOptionParser(['-c:jerry,elaine', '5',
- '--loglevel:kramer=DEBUG',
- '--flag',
- '--logfile=foo', '-Q', 'bar', 'jerry',
- 'elaine', 'kramer',
- '--', '.disable_rate_limits=1'])
- it = multi_args(p, cmd='COMMAND', append='*AP*',
- prefix='*P*', suffix='*S*')
- names = list(it)
- def assert_line_in(name, args):
- self.assertIn(name, [tup[0] for tup in names])
- argv = None
- for item in names:
- if item[0] == name:
- argv = item[1]
- self.assertTrue(argv)
- for arg in args:
- self.assertIn(arg, argv)
- assert_line_in('*P*jerry@*S*',
- [
- 'COMMAND', '-n *P*jerry@*S*', '-Q bar',
- '-c 5', '--flag', '--logfile=foo',
- '-- .disable_rate_limits=1', '*AP*',
- ]
- )
- assert_line_in('*P*elaine@*S*',
- [
- 'COMMAND', '-n *P*elaine@*S*', '-Q bar',
- '-c 5', '--flag', '--logfile=foo',
- '-- .disable_rate_limits=1', '*AP*',
- ]
- )
- assert_line_in('*P*kramer@*S*',
- [
- 'COMMAND', '--loglevel=DEBUG', '-n *P*kramer@*S*',
- '-Q bar', '--flag', '--logfile=foo',
- '-- .disable_rate_limits=1', '*AP*',
- ]
- )
- expand = names[0][2]
- self.assertEqual(expand('%h'), '*P*jerry@*S*')
- self.assertEqual(expand('%n'), 'jerry')
- names2 = list(multi_args(p, cmd='COMMAND', append='',
- prefix='*P*', suffix='*S*'))
- self.assertEqual(names2[0][1][-1], '-- .disable_rate_limits=1')
- gethostname.return_value = 'example.com'
- p2 = NamespacedOptionParser(['10', '-c:1', '5'])
- names3 = list(multi_args(p2, cmd='COMMAND'))
- self.assertEqual(len(names3), 10)
- self.assertEqual(names3[0][0:2], ('celery1@example.com',
- ['COMMAND', '-n celery1@example.com', '-c 5', '']))
- for i, worker in enumerate(names3[1:]):
- self.assertEqual(worker[0:2], ('celery%s@example.com' % (i + 2),
- ['COMMAND', '-n celery%s@example.com' % (i + 2), '']))
- names4 = list(multi_args(p2, cmd='COMMAND', suffix='""'))
- self.assertEqual(len(names4), 10)
- self.assertEqual(names4[0][0:2], ('celery1@',
- ['COMMAND', '-n celery1@', '-c 5', '']))
- p3 = NamespacedOptionParser(['foo@', '-c:foo', '5'])
- names5 = list(multi_args(p3, cmd='COMMAND', suffix='""'))
- self.assertEqual(names5[0][0:2], ('foo@',
- ['COMMAND', '-n foo@', '-c 5', '']))
- class test_MultiTool(Case):
- def setUp(self):
- self.fh = WhateverIO()
- self.env = {}
- self.t = MultiTool(env=self.env, fh=self.fh)
- def test_note(self):
- self.t.note('hello world')
- self.assertEqual(self.fh.getvalue(), 'hello world\n')
- def test_note_quiet(self):
- self.t.quiet = True
- self.t.note('hello world')
- self.assertFalse(self.fh.getvalue())
- def test_info(self):
- self.t.verbose = True
- self.t.info('hello info')
- self.assertEqual(self.fh.getvalue(), 'hello info\n')
- def test_info_not_verbose(self):
- self.t.verbose = False
- self.t.info('hello info')
- self.assertFalse(self.fh.getvalue())
- def test_error(self):
- self.t.say = Mock()
- self.t.usage = Mock()
- self.assertEqual(self.t.error('foo'), 1)
- self.t.say.assert_called_with('foo')
- self.t.usage.assert_called_with()
- self.t.say = Mock()
- self.assertEqual(self.t.error(), 1)
- self.assertFalse(self.t.say.called)
- self.assertEqual(self.t.retcode, 1)
- @patch('celery.bin.multi.Popen')
- def test_waitexec(self, Popen):
- self.t.note = Mock()
- pipe = Popen.return_value = Mock()
- pipe.wait.return_value = -10
- self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 10)
- Popen.assert_called_with(['path', '-m', 'foo'], env=self.t.env)
- self.t.note.assert_called_with('* Child was terminated by signal 10')
- pipe.wait.return_value = 2
- self.assertEqual(self.t.waitexec(['-m', 'foo'], 'path'), 2)
- self.t.note.assert_called_with(
- '* Child terminated with errorcode 2')
- pipe.wait.return_value = 0
- self.assertFalse(self.t.waitexec(['-m', 'foo', 'path']))
- def test_nosplash(self):
- self.t.nosplash = True
- self.t.splash()
- self.assertFalse(self.fh.getvalue())
- def test_splash(self):
- self.t.nosplash = False
- self.t.splash()
- self.assertIn('celery multi', self.fh.getvalue())
- def test_usage(self):
- self.t.usage()
- self.assertTrue(self.fh.getvalue())
- def test_help(self):
- self.t.help([])
- self.assertIn(doc, self.fh.getvalue())
- def test_expand(self):
- self.t.expand(['foo%n', 'ask', 'klask', 'dask'])
- self.assertEqual(self.fh.getvalue(),
- 'fooask\nfooklask\nfoodask\n')
- def test_restart(self):
- stop = self.t._stop_nodes = Mock()
- self.t.restart(['jerry', 'george'], 'celery worker')
- waitexec = self.t.waitexec = Mock()
- self.assertTrue(stop.called)
- callback = stop.call_args[1]['callback']
- self.assertTrue(callback)
- waitexec.return_value = 0
- callback('jerry', ['arg'], 13)
- waitexec.assert_called_with(['arg'])
- self.assertIn('OK', self.fh.getvalue())
- self.fh.seek(0)
- self.fh.truncate()
- waitexec.return_value = 1
- callback('jerry', ['arg'], 13)
- self.assertIn('FAILED', self.fh.getvalue())
- def test_stop(self):
- self.t.getpids = Mock()
- self.t.getpids.return_value = [2, 3, 4]
- self.t.shutdown_nodes = Mock()
- self.t.stop(['a', 'b', '-INT'], 'celery worker')
- self.t.shutdown_nodes.assert_called_with(
- [2, 3, 4], sig=signal.SIGINT, retry=None, callback=None,
- )
- def test_kill(self):
- self.t.getpids = Mock()
- self.t.getpids.return_value = [
- ('a', None, 10),
- ('b', None, 11),
- ('c', None, 12)
- ]
- sig = self.t.signal_node = Mock()
- self.t.kill(['a', 'b', 'c'], 'celery worker')
- sigs = sig.call_args_list
- self.assertEqual(len(sigs), 3)
- self.assertEqual(sigs[0][0], ('a', 10, signal.SIGKILL))
- self.assertEqual(sigs[1][0], ('b', 11, signal.SIGKILL))
- self.assertEqual(sigs[2][0], ('c', 12, signal.SIGKILL))
- def prepare_pidfile_for_getpids(self, Pidfile):
- class pids(object):
- def __init__(self, path):
- self.path = path
- def read_pid(self):
- try:
- return {'foo.pid': 10,
- 'bar.pid': 11}[self.path]
- except KeyError:
- raise ValueError()
- Pidfile.side_effect = pids
- @patch('celery.bin.multi.Pidfile')
- @patch('socket.gethostname')
- def test_getpids(self, gethostname, Pidfile):
- gethostname.return_value = 'e.com'
- self.prepare_pidfile_for_getpids(Pidfile)
- callback = Mock()
- p = NamespacedOptionParser(['foo', 'bar', 'baz'])
- nodes = self.t.getpids(p, 'celery worker', callback=callback)
- node_0, node_1 = nodes
- self.assertEqual(node_0[0], 'foo@e.com')
- self.assertEqual(sorted(node_0[1]),
- sorted(('celery worker', '--pidfile=foo.pid',
- '-n foo@e.com', '')))
- self.assertEqual(node_0[2], 10)
- self.assertEqual(node_1[0], 'bar@e.com')
- self.assertEqual(sorted(node_1[1]),
- sorted(('celery worker', '--pidfile=bar.pid',
- '-n bar@e.com', '')))
- self.assertEqual(node_1[2], 11)
- self.assertTrue(callback.called)
- cargs, _ = callback.call_args
- self.assertEqual(cargs[0], 'baz@e.com')
- self.assertItemsEqual(cargs[1],
- ['celery worker', '--pidfile=baz.pid', '-n baz@e.com', ''],
- )
- self.assertIsNone(cargs[2])
- self.assertIn('DOWN', self.fh.getvalue())
- # without callback, should work
- nodes = self.t.getpids(p, 'celery worker', callback=None)
- @patch('celery.bin.multi.Pidfile')
- @patch('socket.gethostname')
- @patch('celery.bin.multi.sleep')
- def test_shutdown_nodes(self, slepp, gethostname, Pidfile):
- gethostname.return_value = 'e.com'
- self.prepare_pidfile_for_getpids(Pidfile)
- self.assertIsNone(self.t.shutdown_nodes([]))
- self.t.signal_node = Mock()
- node_alive = self.t.node_alive = Mock()
- self.t.node_alive.return_value = False
- callback = Mock()
- self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=callback)
- sigs = sorted(self.t.signal_node.call_args_list)
- self.assertEqual(len(sigs), 2)
- self.assertIn(('foo@e.com', 10, signal.SIGTERM),
- [tup[0] for tup in sigs])
- self.assertIn(('bar@e.com', 11, signal.SIGTERM),
- [tup[0] for tup in sigs])
- self.t.signal_node.return_value = False
- self.assertTrue(callback.called)
- self.t.stop(['foo', 'bar', 'baz'], 'celery worker', callback=None)
- def on_node_alive(pid):
- if node_alive.call_count > 4:
- return True
- return False
- self.t.signal_node.return_value = True
- self.t.node_alive.side_effect = on_node_alive
- self.t.stop(['foo', 'bar', 'baz'], 'celery worker', retry=True)
- @patch('os.kill')
- def test_node_alive(self, kill):
- kill.return_value = True
- self.assertTrue(self.t.node_alive(13))
- esrch = OSError()
- esrch.errno = errno.ESRCH
- kill.side_effect = esrch
- self.assertFalse(self.t.node_alive(13))
- kill.assert_called_with(13, 0)
- enoent = OSError()
- enoent.errno = errno.ENOENT
- kill.side_effect = enoent
- with self.assertRaises(OSError):
- self.t.node_alive(13)
- @patch('os.kill')
- def test_signal_node(self, kill):
- kill.return_value = True
- self.assertTrue(self.t.signal_node('foo', 13, 9))
- esrch = OSError()
- esrch.errno = errno.ESRCH
- kill.side_effect = esrch
- self.assertFalse(self.t.signal_node('foo', 13, 9))
- kill.assert_called_with(13, 9)
- self.assertIn('Could not signal foo', self.fh.getvalue())
- enoent = OSError()
- enoent.errno = errno.ENOENT
- kill.side_effect = enoent
- with self.assertRaises(OSError):
- self.t.signal_node('foo', 13, 9)
- def test_start(self):
- self.t.waitexec = Mock()
- self.t.waitexec.return_value = 0
- self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker'))
- self.t.waitexec.return_value = 1
- self.assertFalse(self.t.start(['foo', 'bar', 'baz'], 'celery worker'))
- def test_show(self):
- self.t.show(['foo', 'bar', 'baz'], 'celery worker')
- self.assertTrue(self.fh.getvalue())
- @patch('socket.gethostname')
- def test_get(self, gethostname):
- gethostname.return_value = 'e.com'
- self.t.get(['xuzzy@e.com', 'foo', 'bar', 'baz'], 'celery worker')
- self.assertFalse(self.fh.getvalue())
- self.t.get(['foo@e.com', 'foo', 'bar', 'baz'], 'celery worker')
- self.assertTrue(self.fh.getvalue())
- @patch('socket.gethostname')
- def test_names(self, gethostname):
- gethostname.return_value = 'e.com'
- self.t.names(['foo', 'bar', 'baz'], 'celery worker')
- self.assertIn('foo@e.com\nbar@e.com\nbaz@e.com', self.fh.getvalue())
- def test_execute_from_commandline(self):
- start = self.t.commands['start'] = Mock()
- self.t.error = Mock()
- self.t.execute_from_commandline(['multi', 'start', 'foo', 'bar'])
- self.assertFalse(self.t.error.called)
- start.assert_called_with(['foo', 'bar'], 'celery worker')
- self.t.error = Mock()
- self.t.execute_from_commandline(['multi', 'frob', 'foo', 'bar'])
- self.t.error.assert_called_with('Invalid command: frob')
- self.t.error = Mock()
- self.t.execute_from_commandline(['multi'])
- self.t.error.assert_called_with()
- self.t.error = Mock()
- self.t.execute_from_commandline(['multi', '-foo'])
- self.t.error.assert_called_with()
- self.t.execute_from_commandline(['multi', 'start', 'foo',
- '--nosplash', '--quiet', '-q', '--verbose', '--no-color'])
- self.assertTrue(self.t.nosplash)
- self.assertTrue(self.t.quiet)
- self.assertTrue(self.t.verbose)
- self.assertTrue(self.t.no_color)
- def test_stopwait(self):
- self.t._stop_nodes = Mock()
- self.t.stopwait(['foo', 'bar', 'baz'], 'celery worker')
- self.assertEqual(self.t._stop_nodes.call_args[1]['retry'], 2)
- @patch('celery.bin.multi.MultiTool')
- def test_main(self, MultiTool):
- m = MultiTool.return_value = Mock()
- with self.assertRaises(SystemExit):
- main()
- m.execute_from_commandline.assert_called_with(sys.argv)
|