Bläddra i källkod

100% coverage for celery.bin.celery

Ask Solem 12 år sedan
förälder
incheckning
929a565d61
2 ändrade filer med 169 tillägg och 16 borttagningar
  1. 1 10
      celery/bin/celery.py
  2. 168 6
      celery/tests/bin/test_celery.py

+ 1 - 10
celery/bin/celery.py

@@ -330,15 +330,6 @@ class _RemoteControl(Command):
                              status=EX_UNAVAILABLE)
         return replies
 
-    def say(self, direction, title, body=''):
-        c = self.colored
-        if direction == '<-' and self.quiet:
-            return
-        dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
-        self.out(c.reset(dirstr, title))
-        if body and self.show_body:
-            self.out(body)
-
 
 class inspect(_RemoteControl):
     """Inspect the worker at runtime.
@@ -763,7 +754,7 @@ class CeleryCommand(Command):
         return '\n'.join(ret).strip()
 
     def with_pool_option(self, argv):
-        if len(argv) > 1 and argv[1] == 'worker':
+        if len(argv) > 1 and 'worker' in argv[0:3]:
             # this command supports custom pools
             # that may have to be loaded as early as possible.
             return (['-P'], ['--pool'])

+ 168 - 6
celery/tests/bin/test_celery.py

@@ -17,6 +17,7 @@ from celery.bin.celery import (
     purge,
     result,
     inspect,
+    control,
     status,
     migrate,
     help,
@@ -25,6 +26,8 @@ from celery.bin.celery import (
     determine_exit_status,
     multi,
     main as mainfun,
+    _RemoteControl,
+    command,
 )
 
 from celery.tests.case import AppCase, Case, WhateverIO, override_stdouts
@@ -297,6 +300,28 @@ class test_CeleryCommand(AppCase):
         with self.assertRaises(SystemExit):
             x.execute_from_commandline()
 
+        x.respects_app_option = True
+        with self.assertRaises(SystemExit):
+            x.execute_from_commandline(['celery', 'multi'])
+        self.assertFalse(x.respects_app_option)
+        x.respects_app_option = True
+        with self.assertRaises(SystemExit):
+            x.execute_from_commandline(['manage.py', 'celery', 'multi'])
+        self.assertFalse(x.respects_app_option)
+
+    def test_with_pool_option(self):
+        x = CeleryCommand(app=self.app)
+        self.assertIsNone(x.with_pool_option(['celery', 'events']))
+        self.assertTrue(x.with_pool_option(['celery', 'worker']))
+        self.assertTrue(x.with_pool_option(['manage.py', 'celery', 'worker']))
+
+    def test_load_extensions_no_commands(self):
+        with patch('celery.bin.celery.Extensions') as Ext:
+            ext = Ext.return_value = Mock(name='Extension')
+            ext.load.return_value = None
+            x = CeleryCommand(app=self.app)
+            x.load_extension_commands()
+
     def test_determine_exit_status(self):
         self.assertEqual(determine_exit_status('true'), EX_OK)
         self.assertEqual(determine_exit_status(''), EX_FAILURE)
@@ -340,12 +365,83 @@ class test_CeleryCommand(AppCase):
             x.prog_name, [], command='help',
         )
 
+        exc = dummy.run_from_argv.side_effect = x.UsageError('foo')
+        x.on_usage_error = Mock()
+        x.execute('dummy', ['dummy'])
+        x.on_usage_error.assert_called_with(exc)
+
+    def test_on_usage_error(self):
+        x = CeleryCommand(app=self.app)
+        x.error = Mock()
+        x.on_usage_error(x.UsageError('foo'), command=None)
+        self.assertTrue(x.error.called)
+        x.on_usage_error(x.UsageError('foo'), command='dummy')
+
+    def test_prepare_prog_name(self):
+        x = CeleryCommand(app=self.app)
+        main = Mock(name='__main__')
+        main.__file__ = '/opt/foo.py'
+        with patch.dict(sys.modules, __main__=main):
+            self.assertEqual(x.prepare_prog_name('__main__.py'), '/opt/foo.py')
+            self.assertEqual(x.prepare_prog_name('celery'), 'celery')
+
+
+class test_RemoteControl(AppCase):
+
+    def test_call_interface(self):
+        with self.assertRaises(NotImplementedError):
+            _RemoteControl(app=self.app).call()
+
 
 class test_inspect(AppCase):
 
     def test_usage(self):
         self.assertTrue(inspect(app=self.app).usage('foo'))
 
+    def test_command_info(self):
+        i = inspect(app=self.app)
+        self.assertTrue(i.get_command_info(
+            'ping', help=True, color=i.colored.red,
+        ))
+
+    def test_list_commands_color(self):
+        i = inspect(app=self.app)
+        self.assertTrue(i.list_commands(
+            help=True, color=i.colored.red,
+        ))
+        self.assertTrue(i.list_commands(
+            help=False, color=None,
+        ))
+
+    def test_epilog(self):
+        self.assertTrue(inspect(app=self.app).epilog)
+
+    def test_do_call_method_sql_transport_type(self):
+        prev, self.app.connection = self.app.connection, Mock()
+        try:
+            conn = self.app.connection.return_value = Mock(name='Connection')
+            conn.transport.driver_type = 'sql'
+            i = inspect(app=self.app)
+            with self.assertRaises(i.Error):
+                i.do_call_method(['ping'])
+        finally:
+            self.app.connection = prev
+
+    def test_say_directions(self):
+        i = inspect(self.app)
+        i.out = Mock()
+        i.quiet = True
+        i.say_chat('<-', 'hello out')
+        self.assertFalse(i.out.called)
+
+        i.say_chat('->', 'hello in')
+        self.assertTrue(i.out.called)
+
+        i.quiet = False
+        i.out.reset_mock()
+        i.say_chat('<-', 'hello out', 'body')
+        self.assertTrue(i.out.called)
+
     @patch('celery.app.control.Control.inspect')
     def test_run(self, real):
         out = WhateverIO()
@@ -375,10 +471,66 @@ class test_inspect(AppCase):
         out.seek(0)
         out.truncate()
         i.quiet = True
-        i.say('<-', 'hello')
+        i.say_chat('<-', 'hello')
         self.assertFalse(out.getvalue())
 
 
+class test_control(AppCase):
+
+    def control(self, patch_call, *args, **kwargs):
+        kwargs.setdefault('app', Mock(name='app'))
+        c = control(*args, **kwargs)
+        if patch_call:
+            c.call = Mock(name='control.call')
+        return c
+
+    def test_call(self):
+        i = self.control(False)
+        i.call('foo', 1, kw=2)
+        i.app.control.foo.assert_called_with(1, kw=2, retry=True)
+
+    def test_pool_grow(self):
+        i = self.control(True)
+        i.pool_grow('pool_grow', n=2)
+        i.call.assert_called_with('pool_grow', 2)
+
+    def test_pool_shrink(self):
+        i = self.control(True)
+        i.pool_shrink('pool_shrink', n=2)
+        i.call.assert_called_with('pool_shrink', 2)
+
+    def test_autoscale(self):
+        i = self.control(True)
+        i.autoscale('autoscale', max=3, min=2)
+        i.call.assert_called_with('autoscale', 3, 2)
+
+    def test_rate_limit(self):
+        i = self.control(True)
+        i.rate_limit('rate_limit', 'proj.add', '1/s')
+        i.call.assert_called_with('rate_limit', 'proj.add', '1/s', reply=True)
+
+    def test_time_limit(self):
+        i = self.control(True)
+        i.time_limit('time_limit', 'proj.add', 10, 30)
+        i.call.assert_called_with('time_limit', 'proj.add', 10, 30, reply=True)
+
+    def test_add_consumer(self):
+        i = self.control(True)
+        i.add_consumer(
+            'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
+            durable=True,
+        )
+        i.call.assert_called_with(
+            'add_consumer', 'queue', 'exchange', 'topic', 'rkey',
+            durable=True, reply=True,
+        )
+
+    def test_cancel_consumer(self):
+        i = self.control(True)
+        i.cancel_consumer('cancel_consumer', 'queue')
+        i.call.assert_called_with('cancel_consumer', 'queue', reply=True)
+
+
 class test_multi(AppCase):
 
     def test_get_options(self):
@@ -397,13 +549,23 @@ class test_main(AppCase):
 
     @patch('celery.bin.celery.CeleryCommand')
     def test_main(self, Command):
-        command = Command.return_value = Mock()
+        cmd = Command.return_value = Mock()
         mainfun()
-        command.execute_from_commandline.assert_called_with(None)
+        cmd.execute_from_commandline.assert_called_with(None)
 
     @patch('celery.bin.celery.CeleryCommand')
     def test_main_KeyboardInterrupt(self, Command):
-        command = Command.return_value = Mock()
-        command.execute_from_commandline.side_effect = KeyboardInterrupt()
+        cmd = Command.return_value = Mock()
+        cmd.execute_from_commandline.side_effect = KeyboardInterrupt()
         mainfun()
-        command.execute_from_commandline.assert_called_with(None)
+        cmd.execute_from_commandline.assert_called_with(None)
+
+
+class test_compat(Case):
+
+    def test_compat_command_decorator(self):
+        with patch('celery.bin.celery.CeleryCommand') as CC:
+            self.assertEqual(command(), CC.register_command)
+            fun = Mock(name='fun')
+            command(fun)
+            CC.register_command.assert_called_with(fun)