123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610 |
- import sys
- from datetime import datetime
- from kombu.utils.json import dumps
- from celery import __main__
- from celery.bin.base import Error
- from celery.bin import celery as mod
- from celery.bin.celery import (
- Command,
- list_,
- call,
- purge,
- result,
- inspect,
- control,
- status,
- migrate,
- help,
- report,
- CeleryCommand,
- determine_exit_status,
- multi,
- main as mainfun,
- _RemoteControl,
- command,
- )
- from celery.five import WhateverIO
- from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
- from celery.tests.case import AppCase, Mock, patch
- class test__main__(AppCase):
- def test_main(self):
- with patch('celery.__main__.maybe_patch_concurrency') as mpc:
- with patch('celery.bin.celery.main') as main:
- __main__.main()
- mpc.assert_called_with()
- main.assert_called_with()
- def test_main__multi(self):
- with patch('celery.__main__.maybe_patch_concurrency') as mpc:
- with patch('celery.bin.celery.main') as main:
- prev, sys.argv = sys.argv, ['foo', 'multi']
- try:
- __main__.main()
- mpc.assert_not_called()
- main.assert_called_with()
- finally:
- sys.argv = prev
- class test_Command(AppCase):
- def test_Error_repr(self):
- x = Error('something happened')
- self.assertIsNotNone(x.status)
- self.assertTrue(x.reason)
- self.assertTrue(str(x))
- def setup(self):
- self.out = WhateverIO()
- self.err = WhateverIO()
- self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
- def test_error(self):
- self.cmd.out = Mock()
- self.cmd.error('FOO')
- self.cmd.out.assert_called()
- def test_out(self):
- f = Mock()
- self.cmd.out('foo', f)
- def test_call(self):
- def ok_run():
- pass
- self.cmd.run = ok_run
- self.assertEqual(self.cmd(), EX_OK)
- def error_run():
- raise Error('error', EX_FAILURE)
- self.cmd.run = error_run
- self.assertEqual(self.cmd(), EX_FAILURE)
- def test_run_from_argv(self):
- with self.assertRaises(NotImplementedError):
- self.cmd.run_from_argv('prog', ['foo', 'bar'])
- def test_pretty_list(self):
- self.assertEqual(self.cmd.pretty([])[1], '- empty -')
- self.assertIn('bar', self.cmd.pretty(['foo', 'bar'])[1])
- def test_pretty_dict(self):
- self.assertIn(
- 'OK',
- str(self.cmd.pretty({'ok': 'the quick brown fox'})[0]),
- )
- self.assertIn(
- 'ERROR',
- str(self.cmd.pretty({'error': 'the quick brown fox'})[0]),
- )
- def test_pretty(self):
- self.assertIn('OK', str(self.cmd.pretty('the quick brown')))
- self.assertIn('OK', str(self.cmd.pretty(object())))
- self.assertIn('OK', str(self.cmd.pretty({'foo': 'bar'})))
- class test_list(AppCase):
- def test_list_bindings_no_support(self):
- l = list_(app=self.app, stderr=WhateverIO())
- management = Mock()
- management.get_bindings.side_effect = NotImplementedError()
- with self.assertRaises(Error):
- l.list_bindings(management)
- def test_run(self):
- l = list_(app=self.app, stderr=WhateverIO())
- l.run('bindings')
- with self.assertRaises(Error):
- l.run(None)
- with self.assertRaises(Error):
- l.run('foo')
- class test_call(AppCase):
- def setup(self):
- @self.app.task(shared=False)
- def add(x, y):
- return x + y
- self.add = add
- @patch('celery.app.base.Celery.send_task')
- def test_run(self, send_task):
- a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
- a.run(self.add.name)
- send_task.assert_called()
- a.run(self.add.name,
- args=dumps([4, 4]),
- kwargs=dumps({'x': 2, 'y': 2}))
- self.assertEqual(send_task.call_args[1]['args'], [4, 4])
- self.assertEqual(send_task.call_args[1]['kwargs'], {'x': 2, 'y': 2})
- a.run(self.add.name, expires=10, countdown=10)
- self.assertEqual(send_task.call_args[1]['expires'], 10)
- self.assertEqual(send_task.call_args[1]['countdown'], 10)
- now = datetime.now()
- iso = now.isoformat()
- a.run(self.add.name, expires=iso)
- self.assertEqual(send_task.call_args[1]['expires'], now)
- with self.assertRaises(ValueError):
- a.run(self.add.name, expires='foobaribazibar')
- class test_purge(AppCase):
- def test_run(self):
- out = WhateverIO()
- a = purge(app=self.app, stdout=out)
- a._purge = Mock(name='_purge')
- a._purge.return_value = 0
- a.run(force=True)
- self.assertIn('No messages purged', out.getvalue())
- a._purge.return_value = 100
- a.run(force=True)
- self.assertIn('100 messages', out.getvalue())
- a.out = Mock(name='out')
- a.ask = Mock(name='ask')
- a.run(force=False)
- a.ask.assert_called_with(a.warn_prompt, ('yes', 'no'), 'no')
- a.ask.return_value = 'yes'
- a.run(force=False)
- class test_result(AppCase):
- def setup(self):
- @self.app.task(shared=False)
- def add(x, y):
- return x + y
- self.add = add
- def test_run(self):
- with patch('celery.result.AsyncResult.get') as get:
- out = WhateverIO()
- r = result(app=self.app, stdout=out)
- get.return_value = 'Jerry'
- r.run('id')
- self.assertIn('Jerry', out.getvalue())
- get.return_value = 'Elaine'
- r.run('id', task=self.add.name)
- self.assertIn('Elaine', out.getvalue())
- with patch('celery.result.AsyncResult.traceback') as tb:
- r.run('id', task=self.add.name, traceback=True)
- self.assertIn(str(tb), out.getvalue())
- class test_status(AppCase):
- @patch('celery.bin.celery.inspect')
- def test_run(self, inspect_):
- out, err = WhateverIO(), WhateverIO()
- ins = inspect_.return_value = Mock()
- ins.run.return_value = []
- s = status(self.app, stdout=out, stderr=err)
- with self.assertRaises(Error):
- s.run()
- ins.run.return_value = ['a', 'b', 'c']
- s.run()
- self.assertIn('3 nodes online', out.getvalue())
- s.run(quiet=True)
- class test_migrate(AppCase):
- @patch('celery.contrib.migrate.migrate_tasks')
- def test_run(self, migrate_tasks):
- out = WhateverIO()
- m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
- with self.assertRaises(TypeError):
- m.run()
- migrate_tasks.assert_not_called()
- m.run('memory://foo', 'memory://bar')
- migrate_tasks.assert_called()
- state = Mock()
- state.count = 10
- state.strtotal = 30
- m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
- self.assertIn('10/30', out.getvalue())
- class test_report(AppCase):
- def test_run(self):
- out = WhateverIO()
- r = report(app=self.app, stdout=out)
- self.assertEqual(r.run(), EX_OK)
- self.assertTrue(out.getvalue())
- class test_help(AppCase):
- def test_run(self):
- out = WhateverIO()
- h = help(app=self.app, stdout=out)
- h.parser = Mock()
- self.assertEqual(h.run(), EX_USAGE)
- self.assertTrue(out.getvalue())
- self.assertTrue(h.usage('help'))
- h.parser.print_help.assert_called_with()
- class test_CeleryCommand(AppCase):
- def test_execute_from_commandline(self):
- x = CeleryCommand(app=self.app)
- x.handle_argv = Mock()
- x.handle_argv.return_value = 1
- with self.assertRaises(SystemExit):
- x.execute_from_commandline()
- x.handle_argv.return_value = True
- with self.assertRaises(SystemExit):
- x.execute_from_commandline()
- x.handle_argv.side_effect = KeyboardInterrupt()
- with self.assertRaises(SystemExit):
- x.execute_from_commandline()
- x.requires_app = True
- x.fake_app = False
- with self.assertRaises(SystemExit):
- x.execute_from_commandline(['celery', 'multi'])
- self.assertFalse(x.requires_app)
- self.assertTrue(x.fake_app)
- x.requires_app = True
- x.fake_app = False
- with self.assertRaises(SystemExit):
- x.execute_from_commandline(['manage.py', 'celery', 'multi'])
- self.assertFalse(x.requires_app)
- self.assertTrue(x.fake_app)
- def test_with_pool_option(self):
- x = CeleryCommand(app=self.app)
- self.assertIsNone(x.with_pool_option(['celery', 'events']))
- self.assertTrue(x.with_pool_option(['celery', 'worker']))
- self.assertTrue(x.with_pool_option(['manage.py', 'celery', 'worker']))
- def test_load_extensions_no_commands(self):
- with patch('celery.bin.celery.Extensions') as Ext:
- ext = Ext.return_value = Mock(name='Extension')
- ext.load.return_value = None
- x = CeleryCommand(app=self.app)
- x.load_extension_commands()
- def test_load_extensions_commands(self):
- with patch('celery.bin.celery.Extensions') as Ext:
- prev, mod.command_classes = list(mod.command_classes), Mock()
- try:
- ext = Ext.return_value = Mock(name='Extension')
- ext.load.return_value = ['foo', 'bar']
- x = CeleryCommand(app=self.app)
- x.load_extension_commands()
- mod.command_classes.append.assert_called_with(
- ('Extensions', ['foo', 'bar'], 'magenta'),
- )
- finally:
- mod.command_classes = prev
- def test_determine_exit_status(self):
- self.assertEqual(determine_exit_status('true'), EX_OK)
- self.assertEqual(determine_exit_status(''), EX_FAILURE)
- def test_relocate_args_from_start(self):
- x = CeleryCommand(app=self.app)
- self.assertEqual(x._relocate_args_from_start(None), [])
- self.assertEqual(
- x._relocate_args_from_start(
- ['-l', 'debug', 'worker', '-c', '3', '--foo'],
- ),
- ['worker', '-c', '3', '--foo', '-l', 'debug'],
- )
- self.assertEqual(
- x._relocate_args_from_start(
- ['--pool=gevent', '-l', 'debug', 'worker', '--foo', '-c', '3'],
- ),
- ['worker', '--foo', '-c', '3', '--pool=gevent', '-l', 'debug'],
- )
- self.assertEqual(
- x._relocate_args_from_start(['foo', '--foo=1']),
- ['foo', '--foo=1'],
- )
- def test_register_command(self):
- prev, CeleryCommand.commands = dict(CeleryCommand.commands), {}
- try:
- fun = Mock(name='fun')
- CeleryCommand.register_command(fun, name='foo')
- self.assertIs(CeleryCommand.commands['foo'], fun)
- finally:
- CeleryCommand.commands = prev
- def test_handle_argv(self):
- x = CeleryCommand(app=self.app)
- x.execute = Mock()
- x.handle_argv('celery', [])
- x.execute.assert_called_with('help', ['help'])
- x.handle_argv('celery', ['start', 'foo'])
- x.execute.assert_called_with('start', ['start', 'foo'])
- def test_execute(self):
- x = CeleryCommand(app=self.app)
- Help = x.commands['help'] = Mock()
- help = Help.return_value = Mock()
- x.execute('fooox', ['a'])
- help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
- help.reset()
- x.execute('help', ['help'])
- help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
- Dummy = x.commands['dummy'] = Mock()
- dummy = Dummy.return_value = Mock()
- exc = dummy.run_from_argv.side_effect = Error(
- 'foo', status='EX_FAILURE',
- )
- x.on_error = Mock(name='on_error')
- help.reset()
- x.execute('dummy', ['dummy'])
- x.on_error.assert_called_with(exc)
- dummy.run_from_argv.assert_called_with(
- x.prog_name, [], command='dummy',
- )
- help.run_from_argv.assert_called_with(
- x.prog_name, [], command='help',
- )
- exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
- x.on_usage_error = Mock()
- x.execute('dummy', ['dummy'])
- x.on_usage_error.assert_called_with(exc)
- def test_on_usage_error(self):
- x = CeleryCommand(app=self.app)
- x.error = Mock()
- x.on_usage_error(x.UsageError('foo'), command=None)
- x.error.assert_called()
- x.on_usage_error(x.UsageError('foo'), command='dummy')
- def test_prepare_prog_name(self):
- x = CeleryCommand(app=self.app)
- main = Mock(name='__main__')
- main.__file__ = '/opt/foo.py'
- with patch.dict(sys.modules, __main__=main):
- self.assertEqual(x.prepare_prog_name('__main__.py'), '/opt/foo.py')
- self.assertEqual(x.prepare_prog_name('celery'), 'celery')
- class test_RemoteControl(AppCase):
- def test_call_interface(self):
- with self.assertRaises(NotImplementedError):
- _RemoteControl(app=self.app).call()
- class test_inspect(AppCase):
- def test_usage(self):
- self.assertTrue(inspect(app=self.app).usage('foo'))
- def test_command_info(self):
- i = inspect(app=self.app)
- self.assertTrue(i.get_command_info(
- 'ping', help=True, color=i.colored.red,
- ))
- def test_list_commands_color(self):
- i = inspect(app=self.app)
- self.assertTrue(i.list_commands(
- help=True, color=i.colored.red,
- ))
- self.assertTrue(i.list_commands(
- help=False, color=None,
- ))
- def test_epilog(self):
- self.assertTrue(inspect(app=self.app).epilog)
- def test_do_call_method_sql_transport_type(self):
- self.app.connection = Mock()
- conn = self.app.connection.return_value = Mock(name='Connection')
- conn.transport.driver_type = 'sql'
- i = inspect(app=self.app)
- with self.assertRaises(i.Error):
- i.do_call_method(['ping'])
- def test_say_directions(self):
- i = inspect(self.app)
- i.out = Mock()
- i.quiet = True
- i.say_chat('<-', 'hello out')
- i.out.assert_not_called()
- i.say_chat('->', 'hello in')
- i.out.assert_called()
- i.quiet = False
- i.out.reset_mock()
- i.say_chat('<-', 'hello out', 'body')
- i.out.assert_called()
- @patch('celery.app.control.Control.inspect')
- def test_run(self, real):
- out = WhateverIO()
- i = inspect(app=self.app, stdout=out)
- with self.assertRaises(Error):
- i.run()
- with self.assertRaises(Error):
- i.run('help')
- with self.assertRaises(Error):
- i.run('xyzzybaz')
- i.run('ping')
- real.assert_called()
- i.run('ping', destination='foo,bar')
- self.assertEqual(real.call_args[1]['destination'], ['foo', 'bar'])
- self.assertEqual(real.call_args[1]['timeout'], 0.2)
- callback = real.call_args[1]['callback']
- callback({'foo': {'ok': 'pong'}})
- self.assertIn('OK', out.getvalue())
- with patch('celery.bin.celery.json.dumps') as dumps:
- i.run('ping', json=True)
- dumps.assert_called()
- instance = real.return_value = Mock()
- instance.ping.return_value = None
- with self.assertRaises(Error):
- i.run('ping')
- out.seek(0)
- out.truncate()
- i.quiet = True
- i.say_chat('<-', 'hello')
- self.assertFalse(out.getvalue())
- def test_objgraph(self):
- i = inspect(app=self.app)
- i.call = Mock(name='call')
- i.objgraph('Message', foo=1)
- i.call.assert_called_with('objgraph', 'Message', foo=1)
- def test_conf(self):
- i = inspect(app=self.app)
- i.call = Mock(name='call')
- i.conf(with_defaults=True, foo=1)
- i.call.assert_called_with('conf', True, foo=1)
- class test_control(AppCase):
- def control(self, patch_call, *args, **kwargs):
- kwargs.setdefault('app', Mock(name='app'))
- c = control(*args, **kwargs)
- if patch_call:
- c.call = Mock(name='control.call')
- return c
- def test_call(self):
- i = self.control(False)
- i.call('foo', 1, kw=2)
- i.app.control.foo.assert_called_with(1, kw=2, reply=True)
- def test_pool_grow(self):
- i = self.control(True)
- i.pool_grow('pool_grow', n=2)
- i.call.assert_called_with('pool_grow', 2)
- def test_pool_shrink(self):
- i = self.control(True)
- i.pool_shrink('pool_shrink', n=2)
- i.call.assert_called_with('pool_shrink', 2)
- def test_rate_limit(self):
- i = self.control(True)
- i.rate_limit('rate_limit', 'proj.add', '1/s')
- i.call.assert_called_with('rate_limit', 'proj.add', '1/s')
- def test_time_limit(self):
- i = self.control(True)
- i.time_limit('time_limit', 'proj.add', 10, 30)
- i.call.assert_called_with('time_limit', 'proj.add', 10, 30)
- def test_add_consumer(self):
- i = self.control(True)
- i.add_consumer(
- 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
- durable=True,
- )
- i.call.assert_called_with(
- 'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
- durable=True,
- )
- def test_cancel_consumer(self):
- i = self.control(True)
- i.cancel_consumer('cancel_consumer', 'queue')
- i.call.assert_called_with('cancel_consumer', 'queue')
- class test_multi(AppCase):
- def test_get_options(self):
- self.assertIsNone(multi(app=self.app).get_options())
- def test_run_from_argv(self):
- with patch('celery.bin.multi.MultiTool') as MultiTool:
- m = MultiTool.return_value = Mock()
- multi(self.app).run_from_argv('celery', ['arg'], command='multi')
- m.execute_from_commandline.assert_called_with(
- ['multi', 'arg'], 'celery',
- )
- class test_main(AppCase):
- @patch('celery.bin.celery.CeleryCommand')
- def test_main(self, Command):
- cmd = Command.return_value = Mock()
- mainfun()
- cmd.execute_from_commandline.assert_called_with(None)
- @patch('celery.bin.celery.CeleryCommand')
- def test_main_KeyboardInterrupt(self, Command):
- cmd = Command.return_value = Mock()
- cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
- mainfun()
- cmd.execute_from_commandline.assert_called_with(None)
- class test_compat(AppCase):
- def test_compat_command_decorator(self):
- with patch('celery.bin.celery.CeleryCommand') as CC:
- self.assertEqual(command(), CC.register_command)
- fun = Mock(name='fun')
- command(fun)
- CC.register_command.assert_called_with(fun)
|