Browse Source

Coverage for celery.backends.* + celery.bootsteps

Ask Solem 12 years ago
parent
commit
b049ef7cbe

+ 9 - 16
celery/bin/amqp.py

@@ -96,9 +96,7 @@ class Spec(object):
     def format_response(self, response):
         """Format the return value of this command in a human-friendly way."""
         if not self.returns:
-            if response is None:
-                return 'ok.'
-            return response
+            return 'ok.' if response is None else response
         if isinstance(self.returns, Callable):
             return self.returns(response)
         return self.returns.format(response)
@@ -296,20 +294,15 @@ class AMQShell(cmd.Cmd):
         cmd, arg, line = self.parseline(line)
         if not line:
             return self.emptyline()
-        if cmd is None:
-            return self.default(line)
         self.lastcmd = line
-        if cmd == '':
-            return self.default(line)
-        else:
-            self.counter = next(self.inc_counter)
-            try:
-                self.respond(self.dispatch(cmd, arg))
-            except (AttributeError, KeyError) as exc:
-                self.default(line)
-            except Exception as exc:
-                self.say(exc)
-                self.needs_reconnect = True
+        self.counter = next(self.inc_counter)
+        try:
+            self.respond(self.dispatch(cmd, arg))
+        except (AttributeError, KeyError) as exc:
+            self.default(line)
+        except Exception as exc:
+            self.say(exc)
+            self.needs_reconnect = True
 
     def respond(self, retval):
         """What to do with the return value of a command."""

+ 20 - 11
celery/bin/base.py

@@ -136,7 +136,7 @@ class Extensions(object):
     def load(self):
         try:
             from pkg_resources import iter_entry_points
-        except ImportError:
+        except ImportError:  # pragma: no cover
             return
 
         for ep in iter_entry_points(self.namespace):
@@ -470,21 +470,29 @@ class Command(object):
         opts = {}
         for opt in self.preload_options:
             for t in (opt._long_opts, opt._short_opts):
-                opts.update(dict(zip(t, [opt.dest] * len(t))))
+                opts.update(dict(zip(t, [opt] * len(t))))
         index = 0
         length = len(args)
         while index < length:
             arg = args[index]
-            if arg.startswith('--') and '=' in arg:
-                key, value = arg.split('=', 1)
-                dest = opts.get(key)
-                if dest:
-                    acc[dest] = value
+            if arg.startswith('--'):
+                if '=' in arg:
+                    key, value = arg.split('=', 1)
+                    opt = opts.get(key)
+                    if opt:
+                        acc[opt.dest] = value
+                else:
+                    opt = opts.get(arg)
+                    if opt and opt.action == 'store_true':
+                        acc[opt.dest] = True
             elif arg.startswith('-'):
-                dest = opts.get(arg)
-                if dest:
-                    acc[dest] = args[index + 1]
-                    index += 1
+                opt = opts.get(arg)
+                if opt:
+                    if opt.takes_value():
+                        acc[opt.dest] = args[index + 1]
+                        index += 1
+                    elif opt.action == 'store_true':
+                        acc[opt.dest] = True
             index += 1
         return acc
 
@@ -517,6 +525,7 @@ class Command(object):
             name, _, domain = host.partition('.')
             keys = dict({'%': '%', 'h': host, 'n': name, 'd': domain}, **keys)
             return match.sub(lambda m: keys[m.expand(expand)], s)
+        return s
 
     def _get_default_app(self, *args, **kwargs):
         from celery._state import get_current_app

+ 1 - 1
celery/bin/celery.py

@@ -52,7 +52,7 @@ command_classes = [
     ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
     ('Utils', ['purge', 'list', 'migrate', 'call', 'result', 'report'], None),
 ]
-if DEBUG:
+if DEBUG:  # pragma: no cover
     command_classes.append(
         ('Debug', ['graph'], 'red'),
     )

+ 3 - 0
celery/tests/bin/proj/__init__.py

@@ -0,0 +1,3 @@
+from celery import Celery
+
+hello = Celery(set_as_current=False)

+ 3 - 0
celery/tests/bin/proj/app.py

@@ -0,0 +1,3 @@
+from celery import Celery
+
+app = Celery(set_as_current=False)

+ 14 - 0
celery/tests/bin/test_amqp.py

@@ -36,6 +36,11 @@ class test_AMQShell(AppCase):
     def RV(self):
         raise Exception(self.fh.getvalue())
 
+    def test_spec_format_response(self):
+        spec = self.shell.amqp['exchange.declare']
+        self.assertEqual(spec.format_response(None), 'ok.')
+        self.assertEqual(spec.format_response('NO'), 'NO')
+
     def test_missing_namespace(self):
         self.shell.onecmd('ns.cmd arg')
         self.assertIn('unknown syntax', self.fh.getvalue())
@@ -52,6 +57,15 @@ class test_AMQShell(AppCase):
         self.shell.onecmd('help foo.baz')
         self.assertIn('unknown syntax', self.fh.getvalue())
 
+    def test_onecmd_error(self):
+        self.shell.dispatch = Mock()
+        self.shell.dispatch.side_effect = MemoryError()
+        self.shell.say = Mock()
+        self.assertFalse(self.shell.needs_reconnect)
+        self.shell.onecmd('hello')
+        self.assertTrue(self.shell.say.called)
+        self.assertTrue(self.shell.needs_reconnect)
+
     def test_exit(self):
         with self.assertRaises(SystemExit):
             self.shell.onecmd('exit')

+ 177 - 3
celery/tests/bin/test_base.py

@@ -2,10 +2,15 @@ from __future__ import absolute_import
 
 import os
 
-from mock import patch
+from mock import Mock, patch
 
-from celery.bin.base import Command, Option
-from celery.tests.utils import AppCase, override_stdouts
+from celery.bin.base import (
+    Command,
+    Option,
+    Extensions,
+    HelpFormatter,
+)
+from celery.tests.utils import AppCase, Case, override_stdouts
 
 
 class Object(object):
@@ -31,6 +36,47 @@ class MockCommand(Command):
         return args, kwargs
 
 
+class test_Extensions(Case):
+
+    def test_load(self):
+        with patch('pkg_resources.iter_entry_points') as iterep:
+            with patch('celery.bin.base.symbol_by_name') as symbyname:
+                ep = Mock()
+                ep.name = 'ep'
+                ep.module_name = 'foo'
+                ep.attrs = ['bar', 'baz']
+                iterep.return_value = [ep]
+                cls = symbyname.return_value = Mock()
+                register = Mock()
+                e = Extensions('unit', register)
+                e.load()
+                symbyname.assert_called_with('foo:bar')
+                register.assert_called_with(cls, name='ep')
+
+            with patch('celery.bin.base.symbol_by_name') as symbyname:
+                symbyname.side_effect = SyntaxError()
+                with patch('warnings.warn') as warn:
+                    e.load()
+                    self.assertTrue(warn.called)
+
+            with patch('celery.bin.base.symbol_by_name') as symbyname:
+                symbyname.side_effect = KeyError('foo')
+                with self.assertRaises(KeyError):
+                    e.load()
+
+
+class test_HelpFormatter(Case):
+
+    def test_format_epilog(self):
+        f = HelpFormatter()
+        self.assertTrue(f.format_epilog('hello'))
+        self.assertFalse(f.format_epilog(''))
+
+    def test_format_description(self):
+        f = HelpFormatter()
+        self.assertTrue(f.format_description('hello'))
+
+
 class test_Command(AppCase):
 
     def test_get_options(self):
@@ -38,6 +84,48 @@ class test_Command(AppCase):
         cmd.option_list = (1, 2, 3)
         self.assertTupleEqual(cmd.get_options(), (1, 2, 3))
 
+    def test_custom_description(self):
+
+        class C(Command):
+            description = 'foo'
+
+        c = C()
+        self.assertEqual(c.description, 'foo')
+
+    def test_register_callbacks(self):
+        c = Command(on_error=8, on_usage_error=9)
+        self.assertEqual(c.on_error, 8)
+        self.assertEqual(c.on_usage_error, 9)
+
+    def test_run_raises_UsageError(self):
+        cb = Mock()
+        c = Command(on_usage_error=cb)
+        c.verify_args = Mock()
+        c.run = Mock()
+        exc = c.run.side_effect = c.UsageError('foo', status=3)
+
+        self.assertEqual(c(), exc.status)
+        cb.assert_called_with(exc)
+        c.verify_args.assert_called_with(())
+
+    def test_default_on_usage_error(self):
+        cmd = Command()
+        cmd.handle_error = Mock()
+        exc = Exception()
+        cmd.on_usage_error(exc)
+        cmd.handle_error.assert_called_with(exc)
+
+    def test_verify_args_missing(self):
+        c = Command()
+
+        def run(a, b, c):
+            pass
+        c.run = run
+
+        with self.assertRaises(c.UsageError):
+            c.verify_args((1, ))
+        c.verify_args((1, 2, 3))
+
     def test_run_interface(self):
         with self.assertRaises(NotImplementedError):
             Command().run()
@@ -101,6 +189,92 @@ class test_Command(AppCase):
         cmd.setup_app_from_commandline(['--app=%s' % (app, ),
                                         '--loglevel=INFO'])
         self.assertIs(cmd.app, APP)
+        cmd.setup_app_from_commandline(['-A', app,
+                                        '--loglevel=INFO'])
+        self.assertIs(cmd.app, APP)
+
+    def test_setup_app_sets_quiet(self):
+        cmd = MockCommand()
+        cmd.setup_app_from_commandline(['-q'])
+        self.assertTrue(cmd.quiet)
+        cmd2 = MockCommand()
+        cmd2.setup_app_from_commandline(['--quiet'])
+        self.assertTrue(cmd2.quiet)
+
+    def test_setup_app_sets_chdir(self):
+        with patch('os.chdir') as chdir:
+            cmd = MockCommand()
+            cmd.setup_app_from_commandline(['--workdir=/opt'])
+            chdir.assert_called_with('/opt')
+
+    def test_setup_app_sets_loader(self):
+        prev = os.environ.get('CELERY_LOADER')
+        try:
+            cmd = MockCommand()
+            cmd.setup_app_from_commandline(['--loader=X.Y:Z'])
+            self.assertEqual(os.environ['CELERY_LOADER'], 'X.Y:Z')
+        finally:
+            if prev is not None:
+                os.environ['CELERY_LOADER'] = prev
+
+    def test_setup_app_no_respect(self):
+        cmd = MockCommand()
+        cmd.respects_app_option = False
+        with patch('celery.Celery') as cp:
+            cmd.setup_app_from_commandline(['--app=x.y:z'])
+            self.assertTrue(cp.called)
+
+    def test_setup_app_custom_app(self):
+        cmd = MockCommand()
+        app = cmd.app = Mock()
+        cmd.setup_app_from_commandline([])
+        self.assertEqual(cmd.app, app)
+
+    def test_find_app_suspects(self):
+        cmd = MockCommand()
+        self.assertTrue(cmd.find_app('celery.tests.bin.proj.app'))
+        self.assertTrue(cmd.find_app('celery.tests.bin.proj'))
+        self.assertTrue(cmd.find_app('celery.tests.bin.proj:hello'))
+        self.assertTrue(cmd.find_app('celery.tests.bin.proj.app:app'))
+
+        with self.assertRaises(AttributeError):
+            cmd.find_app(__name__)
+
+    def test_simple_format(self):
+        cmd = MockCommand()
+        with patch('socket.gethostname') as hn:
+            hn.return_value = 'blacktron.example.com'
+            self.assertEqual(cmd.simple_format(''), '')
+            self.assertEqual(
+                cmd.simple_format('celery@%h'),
+                'celery@blacktron.example.com',
+            )
+            self.assertEqual(
+                cmd.simple_format('celery@%d'),
+                'celery@example.com',
+            )
+            self.assertEqual(
+                cmd.simple_format('celery@%n'),
+                'celery@blacktron',
+            )
+
+    def test_say_chat_quiet(self):
+        cmd = MockCommand()
+        cmd.quiet = True
+        self.assertIsNone(cmd.say_chat('<-', 'foo', 'foo'))
+
+    def test_say_chat_show_body(self):
+        cmd = MockCommand()
+        cmd.out = Mock()
+        cmd.show_body = True
+        cmd.say_chat('->', 'foo', 'body')
+        cmd.out.assert_called_with('body')
+
+    def test_say_chat_no_body(self):
+        cmd = MockCommand()
+        cmd.out = Mock()
+        cmd.show_body = False
+        cmd.say_chat('->', 'foo', 'body')
 
     def test_with_cmdline_config(self):
         cmd = MockCommand()

+ 38 - 10
celery/tests/bin/test_celery.py

@@ -23,6 +23,7 @@ from celery.bin.celery import (
     report,
     CeleryCommand,
     determine_exit_status,
+    multi,
     main as mainfun,
 )
 
@@ -204,17 +205,22 @@ class test_purge(AppCase):
 
 class test_result(AppCase):
 
-    @patch('celery.result.AsyncResult.get')
-    def test_run(self, get):
-        out = WhateverIO()
-        r = result(app=self.app, stdout=out)
-        get.return_value = 'Jerry'
-        r.run('id')
-        self.assertIn('Jerry', out.getvalue())
+    def test_run(self):
+        with patch('celery.result.AsyncResult.get') as get:
+            out = WhateverIO()
+            r = result(app=self.app, stdout=out)
+            get.return_value = 'Jerry'
+            r.run('id')
+            self.assertIn('Jerry', out.getvalue())
+
+            get.return_value = 'Elaine'
+            r.run('id', task=add.name)
+            self.assertIn('Elaine', out.getvalue())
+
+            with patch('celery.result.AsyncResult.traceback') as tb:
+                r.run('id', task=add.name, traceback=True)
+                self.assertIn(str(tb), out.getvalue())
 
-        get.return_value = 'Elaine'
-        r.run('id', task=add.name)
-        self.assertIn('Elaine', out.getvalue())
 
 
 class test_status(AppCase):
@@ -374,6 +380,20 @@ class test_inspect(AppCase):
         self.assertFalse(out.getvalue())
 
 
+class test_multi(AppCase):
+
+    def test_get_options(self):
+        self.assertTupleEqual(multi(app=self.app).get_options(), ())
+
+    def test_run_from_argv(self):
+        with patch('celery.bin.multi.MultiTool') as MultiTool:
+            m = MultiTool.return_value = Mock()
+            multi(self.app).run_from_argv('celery', ['arg'], command='multi')
+            m.execute_from_commandline.assert_called_with(
+                ['multi', 'arg'], 'celery',
+            )
+
+
 class test_main(AppCase):
 
     @patch('celery.bin.celery.CeleryCommand')
@@ -381,3 +401,11 @@ class test_main(AppCase):
         command = Command.return_value = Mock()
         mainfun()
         command.execute_from_commandline.assert_called_with(None)
+
+
+    @patch('celery.bin.celery.CeleryCommand')
+    def test_main_KeyboardInterrupt(self, Command):
+        command = Command.return_value = Mock()
+        command.execute_from_commandline.side_effect = KeyboardInterrupt()
+        mainfun()
+        command.execute_from_commandline.assert_called_with(None)

+ 168 - 1
celery/tests/worker/test_bootsteps.py

@@ -1,12 +1,58 @@
 from __future__ import absolute_import
 
-from mock import Mock
+from mock import Mock, patch
 
 from celery import bootsteps
 
 from celery.tests.utils import AppCase, Case
 
 
+class test_StepFormatter(Case):
+
+    def test_get_prefix(self):
+        f = bootsteps.StepFormatter()
+        s = Mock()
+        s.last = True
+        self.assertEqual(f._get_prefix(s), f.blueprint_prefix)
+
+        s2 = Mock()
+        s2.last = False
+        s2.conditional = True
+        self.assertEqual(f._get_prefix(s2), f.conditional_prefix)
+
+        s3 = Mock()
+        s3.last = s3.conditional = False
+        self.assertEqual(f._get_prefix(s3), '')
+
+    def test_node(self):
+        f = bootsteps.StepFormatter()
+        f.draw_node = Mock()
+        step = Mock()
+        step.last = False
+        f.node(step, x=3)
+        f.draw_node.assert_called_with(step, f.node_scheme, {'x': 3})
+
+        step.last = True
+        f.node(step, x=3)
+        f.draw_node.assert_called_with(step, f.blueprint_scheme, {'x': 3})
+
+    def test_edge(self):
+        f = bootsteps.StepFormatter()
+        f.draw_edge = Mock()
+        a, b = Mock(), Mock()
+        a.last = True
+        f.edge(a, b, x=6)
+        f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
+            'x': 6, 'arrowhead': 'none', 'color': 'darkseagreen3',
+        })
+
+        a.last = False
+        f.edge(a, b, x=6)
+        f.draw_edge.assert_called_with(a, b, f.edge_scheme, {
+            'x': 6,
+        })
+
+
 class test_Step(Case):
 
     class Def(bootsteps.StartStopStep):
@@ -61,6 +107,51 @@ class test_Step(Case):
         self.assertFalse(x.include(self))
         self.assertFalse(x.create.call_count)
 
+    def test_repr(self):
+        x = self.Def(self)
+        self.assertTrue(repr(x))
+
+
+class test_ConsumerStep(AppCase):
+
+    def test_interface(self):
+        step = bootsteps.ConsumerStep(self)
+        with self.assertRaises(NotImplementedError):
+            step.get_consumers(self)
+
+    def test_start_stop_shutdown(self):
+        consumer = Mock()
+        conn = self.connection = Mock()
+
+        class Step(bootsteps.ConsumerStep):
+
+            def get_consumers(self, c):
+                return [consumer]
+
+        step = Step(self)
+        self.assertEqual(step.get_consumers(self), [consumer])
+
+        step.start(self)
+        consumer.consume.assert_called_with()
+        step.stop(self)
+        consumer.cancel.assert_called_with()
+
+        step.shutdown(self)
+        consumer.channel.close.assert_called_with()
+
+    def test_start_no_consumers(self):
+        self.connection = Mock()
+
+        class Step(bootsteps.ConsumerStep):
+
+            def get_consumers(self, c):
+                return ()
+
+        step = Step(self)
+        step.start(self)
+
+
+
 
 class test_StartStopStep(Case):
 
@@ -87,6 +178,9 @@ class test_StartStopStep(Case):
         x.stop(self)
         x.obj.stop.assert_called_with()
 
+        x.obj = None
+        self.assertIsNone(x.start(self))
+
     def test_include_when_disabled(self):
         x = self.Def(self)
         x.enabled = False
@@ -132,6 +226,79 @@ class test_Blueprint(AppCase):
         self.assertIs(blueprint.app, self.app)
         self.assertEqual(blueprint.name, 'test_Blueprint')
 
+    def test_close__on_close_is_None(self):
+        blueprint = self.Blueprint(app=self.app)
+        blueprint.on_close = None
+        blueprint.send_all = Mock()
+        blueprint.close(1)
+        blueprint.send_all.assert_called_with(
+            1, 'close', 'Closing', reverse=False,
+        )
+
+    def test_send_all_with_None_steps(self):
+        parent = Mock()
+        blueprint = self.Blueprint(app=self.app)
+        parent.steps = [None, None, None]
+        blueprint.send_all(parent, 'close', 'Closing', reverse=False)
+
+    def test_join_raises_IGNORE_ERRORS(self):
+        prev, bootsteps.IGNORE_ERRORS = bootsteps.IGNORE_ERRORS, (KeyError, )
+        try:
+            blueprint = self.Blueprint(app=self.app)
+            blueprint.shutdown_complete = Mock()
+            blueprint.shutdown_complete.wait.side_effect = KeyError('luke')
+            blueprint.join(timeout=10)
+            blueprint.shutdown_complete.wait.assert_called_with(timeout=10)
+        finally:
+            bootsteps.IGNORE_ERRORS = prev
+
+    def test_connect_with(self):
+
+        class b1s1(bootsteps.Step):
+            pass
+
+        class b1s2(bootsteps.Step):
+            last = True
+
+        class b2s1(bootsteps.Step):
+            pass
+
+        class b2s2(bootsteps.Step):
+            last = True
+
+        b1 = self.Blueprint([b1s1, b1s2], app=self.app)
+        b2 = self.Blueprint([b2s1, b2s2], app=self.app)
+        b1.apply(Mock())
+        b2.apply(Mock())
+        b1.connect_with(b2)
+
+        self.assertIn(b1s1, b1.graph)
+        self.assertIn(b2s1, b1.graph)
+        self.assertIn(b2s2, b1.graph)
+
+        self.assertTrue(repr(b1s1))
+        self.assertTrue(str(b1s1))
+
+    def test_topsort_raises_KeyError(self):
+
+        class Step(bootsteps.Step):
+            requires = ('xyxxx.fsdasewe.Unknown', )
+
+        b = self.Blueprint([Step], app=self.app)
+        b.steps = b.claim_steps()
+        with self.assertRaises(ImportError):
+            b._finalize_steps(b.steps)
+        Step.requires = ()
+
+        b.steps = b.claim_steps()
+        b._finalize_steps(b.steps)
+
+        with patch('celery.bootsteps.DependencyGraph') as Dep:
+            g = Dep.return_value = Mock()
+            g.topsort.side_effect = KeyError('foo')
+            with self.assertRaises(KeyError):
+                b._finalize_steps(b.steps)
+
     def test_apply(self):
 
         class MyBlueprint(bootsteps.Blueprint):

+ 1 - 1
setup.cfg

@@ -3,7 +3,7 @@ where = celery/tests
 cover3-branch = 1
 cover3-html = 1
 cover3-package = celery
-cover3-exclude = celery.utils.debug,celery.tests.*
+cover3-exclude = celery.utils.debug,celery.tests.*,celery.bin.graph
 
 [build_sphinx]
 source-dir = docs/