Browse Source

Tests passing

Ask Solem 12 years ago
parent
commit
c427dab107

+ 1 - 1
celery/app/base.py

@@ -147,7 +147,7 @@ class Celery(object):
                     .execute_from_commandline(argv)
 
     def worker_main(self, argv=None):
-        return instantiate('celery.bin.worker:WorkerCommand', app=self) \
+        return instantiate('celery.bin.worker:worker', app=self) \
                     .execute_from_commandline(argv)
 
     def task(self, *args, **opts):

+ 12 - 5
celery/app/log.py

@@ -67,15 +67,22 @@ class Logging(object):
             loglevel, logfile, colorize=colorize,
         )
         if not handled:
-            logger = get_logger('celery.redirected')
             if redirect_stdouts:
-                self.redirect_stdouts_to_logger(logger,
-                                loglevel=redirect_level)
+                self.redirect_stdouts(redirect_level)
         os.environ.update(
             CELERY_LOG_LEVEL=str(loglevel) if loglevel else '',
             CELERY_LOG_FILE=str(logfile) if logfile else '',
-            CELERY_LOG_REDIRECT='1' if redirect_stdouts else '',
-            CELERY_LOG_REDIRECT_LEVEL=str(redirect_level))
+        )
+        return handled
+
+    def redirect_stdouts(self, loglevel=None, name='celery.redirected'):
+        self.redirect_stdouts_to_logger(
+            get_logger(name), loglevel=loglevel
+        )
+        os.environ.update(
+            CELERY_LOG_REDIRECT='1',
+            CELERY_LOG_REDIRECT_LEVEL=str(loglevel or ''),
+        )
 
     def setup_logging_subsystem(self, loglevel=None, logfile=None,
             format=None, colorize=None, **kwargs):

+ 6 - 4
celery/apps/worker.py

@@ -103,11 +103,14 @@ class Worker(WorkController):
         )
 
     def on_init_namespace(self):
-        self.setup_logging()
+        self._custom_logging = self.setup_logging()
         # apply task execution optimizations
         trace.setup_worker_optimizations(self.app)
 
     def on_start(self):
+        if not self._custom_logging and self.redirect_stdouts:
+            self.app.log.redirect_stdouts(self.redirect_stdouts_level)
+
         WorkController.on_start(self)
 
         # this signal can be used to e.g. change queues after
@@ -140,9 +143,8 @@ class Worker(WorkController):
     def setup_logging(self, colorize=None):
         if colorize is None and self.no_color is not None:
             colorize = not self.no_color
-        self.app.log.setup(self.loglevel, self.logfile,
-                           self.redirect_stdouts, self.redirect_stdouts_level,
-                           colorize=colorize)
+        return self.app.log.setup(self.loglevel, self.logfile,
+                   redirect_stdouts=False, colorize=colorize)
 
     def purge_messages(self):
         count = self.app.control.purge()

+ 4 - 4
celery/tests/app/test_app.py

@@ -270,18 +270,18 @@ class test_App(Case):
     def test_worker_main(self):
         from celery.bin import worker as worker_bin
 
-        class WorkerCommand(worker_bin.WorkerCommand):
+        class worker(worker_bin.worker):
 
             def execute_from_commandline(self, argv):
                 return argv
 
-        prev, worker_bin.WorkerCommand = \
-                worker_bin.WorkerCommand, WorkerCommand
+        prev, worker_bin.worker = \
+                worker_bin.worker, worker
         try:
             ret = self.app.worker_main(argv=['--version'])
             self.assertListEqual(ret, ['--version'])
         finally:
-            worker_bin.WorkerCommand = prev
+            worker_bin.worker = prev
 
     def test_config_from_envvar(self):
         os.environ['CELERYTEST_CONFIG_OBJECT'] = 'celery.tests.app.test_app'

+ 4 - 16
celery/tests/bin/test_amqp.py

@@ -7,8 +7,7 @@ from celery.bin.amqp import (
     AMQPAdmin,
     AMQShell,
     dump_message,
-    AMQPAdminCommand,
-    run,
+    amqp,
     main,
 )
 
@@ -127,25 +126,14 @@ class test_AMQShell(AppCase):
         a.run()
         self.assertIn('bibi', self.fh.getvalue())
 
-    @patch('celery.bin.amqp.AMQPAdminCommand')
+    @patch('celery.bin.amqp.amqp')
     def test_main(self, Command):
         c = Command.return_value = Mock()
         main()
         c.execute_from_commandline.assert_called_with()
 
     @patch('celery.bin.amqp.AMQPAdmin')
-    def test_amqp(self, cls):
-        c = cls.return_value = Mock()
-        run()
-        c.run.assert_called_with()
-
-    @patch('celery.bin.amqp.AMQPAdmin')
-    def test_AMQPAdminCommand(self, cls):
-        c = cls.return_value = Mock()
-        run()
-        c.run.assert_called_with()
-
-        x = AMQPAdminCommand(app=self.app)
+    def test_command(self, cls):
+        x = amqp(app=self.app)
         x.run()
         self.assertIs(cls.call_args[1]['app'], self.app)
-        c.run.assert_called_with()

+ 5 - 5
celery/tests/bin/test_base.py

@@ -21,7 +21,7 @@ APP = MyApp()  # <-- Used by test_with_custom_app
 class MockCommand(Command):
     mock_args = ('arg1', 'arg2', 'arg3')
 
-    def parse_options(self, prog_name, arguments):
+    def parse_options(self, prog_name, arguments, command=None):
         options = Object()
         options.foo = 'bar'
         options.prog_name = prog_name
@@ -60,13 +60,13 @@ class test_Command(AppCase):
                                       kwargs2)
 
     def test_with_bogus_args(self):
-        cmd = MockCommand()
-        cmd.supports_args = False
         with override_stdouts() as (_, stderr):
+            cmd = MockCommand()
+            cmd.supports_args = False
             with self.assertRaises(SystemExit):
                 cmd.execute_from_commandline(argv=['--bogus'])
-        self.assertTrue(stderr.getvalue())
-        self.assertIn('Unrecognized', stderr.getvalue())
+            self.assertTrue(stderr.getvalue())
+            self.assertIn('Unrecognized', stderr.getvalue())
 
     def test_with_custom_config_module(self):
         prev = os.environ.pop('CELERY_CONFIG_MODULE', None)

+ 5 - 2
celery/tests/bin/test_beat.py

@@ -39,6 +39,7 @@ class MockService(beat.Service):
 
 class MockBeat(beatapp.Beat):
     running = False
+    redirect_stdouts = False
 
     def run(self):
         MockBeat.running = True
@@ -46,6 +47,7 @@ class MockBeat(beatapp.Beat):
 
 class MockBeat2(beatapp.Beat):
     Service = MockService
+    redirect_stdouts = False
 
     def install_sync_handler(self, b):
         pass
@@ -53,6 +55,7 @@ class MockBeat2(beatapp.Beat):
 
 class MockBeat3(beatapp.Beat):
     Service = MockService
+    redirect_stdouts = False
 
     def install_sync_handler(self, b):
         raise TypeError('xxx')
@@ -170,14 +173,14 @@ class test_div(AppCase):
             MockBeat.running = False
 
     def test_detach(self):
-        cmd = beat_bin.BeatCommand()
+        cmd = beat_bin.beat()
         cmd.app = app_or_default()
         cmd.run(detach=True)
         self.assertTrue(MockDaemonContext.opened)
         self.assertTrue(MockDaemonContext.closed)
 
     def test_parse_options(self):
-        cmd = beat_bin.BeatCommand()
+        cmd = beat_bin.beat()
         cmd.app = app_or_default()
         options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
         self.assertEqual(options.schedule, 'foo')

+ 18 - 36
celery/tests/bin/test_celery.py

@@ -9,7 +9,6 @@ from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
 from celery.bin.celery import (
     Command,
     Error,
-    worker,
     list_,
     call,
     purge,
@@ -45,14 +44,6 @@ class test_Command(AppCase):
         self.err = WhateverIO()
         self.cmd = Command(self.app, stdout=self.out, stderr=self.err)
 
-    def test_exit_help(self):
-        self.cmd.run_from_argv = Mock()
-        with self.assertRaises(SystemExit):
-            self.cmd.exit_help('foo')
-        self.cmd.run_from_argv.assert_called_with(
-                self.cmd.prog_name, ['foo', '--help']
-        )
-
     def test_error(self):
         self.cmd.out = Mock()
         self.cmd.error('FOO')
@@ -73,34 +64,21 @@ class test_Command(AppCase):
     def test_run_from_argv(self):
         with self.assertRaises(NotImplementedError):
             self.cmd.run_from_argv('prog', ['foo', 'bar'])
-        self.assertEqual(self.cmd.prog_name, 'prog')
 
-    def test_prettify_list(self):
-        self.assertEqual(self.cmd.prettify([])[1], '- empty -')
-        self.assertIn('bar', self.cmd.prettify(['foo', 'bar'])[1])
+    def test_pretty_list(self):
+        self.assertEqual(self.cmd.pretty([])[1], '- empty -')
+        self.assertIn('bar', self.cmd.pretty(['foo', 'bar'])[1])
 
-    def test_prettify_dict(self):
+    def test_pretty_dict(self):
         self.assertIn('OK',
-            str(self.cmd.prettify({'ok': 'the quick brown fox'})[0]))
+            str(self.cmd.pretty({'ok': 'the quick brown fox'})[0]))
         self.assertIn('ERROR',
-            str(self.cmd.prettify({'error': 'the quick brown fox'})[0]))
-
-    def test_prettify(self):
-        self.assertIn('OK', str(self.cmd.prettify('the quick brown')))
-        self.assertIn('OK', str(self.cmd.prettify(object())))
-        self.assertIn('OK', str(self.cmd.prettify({'foo': 'bar'})))
-
+            str(self.cmd.pretty({'error': 'the quick brown fox'})[0]))
 
-class test_Delegate(AppCase):
-
-    def test_get_options(self):
-        self.assertTrue(worker(app=self.app).get_options())
-
-    def test_run(self):
-        w = worker()
-        w.target.run = Mock()
-        w.run()
-        w.target.run.assert_called_with()
+    def test_pretty(self):
+        self.assertIn('OK', str(self.cmd.pretty('the quick brown')))
+        self.assertIn('OK', str(self.cmd.pretty(object())))
+        self.assertIn('OK', str(self.cmd.pretty({'foo': 'bar'})))
 
 
 class test_list(AppCase):
@@ -280,18 +258,22 @@ class test_CeleryCommand(AppCase):
         Help = x.commands['help'] = Mock()
         help = Help.return_value = Mock()
         x.execute('fooox', ['a'])
-        help.run_from_argv.assert_called_with(x.prog_name, ['help'])
+        help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
         help.reset()
         x.execute('help', ['help'])
-        help.run_from_argv.assert_called_with(x.prog_name, ['help'])
+        help.run_from_argv.assert_called_with(x.prog_name, [], command='help')
 
         Dummy = x.commands['dummy'] = Mock()
         dummy = Dummy.return_value = Mock()
         dummy.run_from_argv.side_effect = Error('foo', status='EX_FAILURE')
         help.reset()
         x.execute('dummy', ['dummy'])
-        dummy.run_from_argv.assert_called_with(x.prog_name, ['dummy'])
-        help.run_from_argv.assert_called_with(x.prog_name, ['dummy'])
+        dummy.run_from_argv.assert_called_with(
+            x.prog_name, [], command='dummy',
+        )
+        help.run_from_argv.assert_called_with(
+            x.prog_name, [], command='dummy',
+        )
 
 
 class test_inspect(AppCase):

+ 3 - 3
celery/tests/bin/test_events.py

@@ -21,11 +21,11 @@ def proctitle(prog, info=None):
 proctitle.last = ()
 
 
-class test_EvCommand(Case):
+class test_events(Case):
 
     def setUp(self):
         self.app = app_or_default()
-        self.ev = events.EvCommand(app=self.app)
+        self.ev = events.events(app=self.app)
 
     @patch('celery.events.dumper', 'evdump', lambda **kw: 'me dumper, you?')
     @patch('celery.bin.events', 'set_process_title', proctitle)
@@ -68,7 +68,7 @@ class test_EvCommand(Case):
     def test_get_options(self):
         self.assertTrue(self.ev.get_options())
 
-    @patch('celery.bin.events', 'EvCommand', MockCommand)
+    @patch('celery.bin.events', 'events', MockCommand)
     def test_main(self):
         MockCommand.executed = []
         events.main()

+ 14 - 14
celery/tests/bin/test_worker.py

@@ -17,7 +17,7 @@ from celery import platforms
 from celery import signals
 from celery import current_app
 from celery.apps import worker as cd
-from celery.bin.worker import WorkerCommand, main as worker_main
+from celery.bin.worker import worker, main as worker_main
 from celery.exceptions import ImproperlyConfigured, SystemTerminate
 from celery.task import trace
 from celery.utils.log import ensure_process_aware_logger
@@ -75,9 +75,9 @@ class test_Worker(WorkerAppCase):
     @disable_stdouts
     def test_queues_string(self):
         celery = Celery(set_as_current=False)
-        worker = celery.Worker()
-        worker.setup_queues('foo,bar,baz')
-        self.assertEqual(worker.queues, ['foo', 'bar', 'baz'])
+        w = celery.Worker()
+        w.setup_queues('foo,bar,baz')
+        self.assertEqual(w.queues, ['foo', 'bar', 'baz'])
         self.assertTrue('foo' in celery.amqp.queues)
 
     @disable_stdouts
@@ -85,34 +85,34 @@ class test_Worker(WorkerAppCase):
         celery = Celery(set_as_current=False)
         with patch('celery.worker.cpu_count') as cpu_count:
             cpu_count.side_effect = NotImplementedError()
-            worker = celery.Worker(concurrency=None)
-            self.assertEqual(worker.concurrency, 2)
-        worker = celery.Worker(concurrency=5)
-        self.assertEqual(worker.concurrency, 5)
+            w = celery.Worker(concurrency=None)
+            self.assertEqual(w.concurrency, 2)
+        w = celery.Worker(concurrency=5)
+        self.assertEqual(w.concurrency, 5)
 
     @disable_stdouts
     def test_windows_B_option(self):
         celery = Celery(set_as_current=False)
         celery.IS_WINDOWS = True
         with self.assertRaises(SystemExit):
-            WorkerCommand(app=celery).run(beat=True)
+            worker(app=celery).run(beat=True)
 
     def test_setup_concurrency_very_early(self):
-        x = WorkerCommand()
+        x = worker()
         x.run = Mock()
         with self.assertRaises(ImportError):
             x.execute_from_commandline(['worker', '-P', 'xyzybox'])
 
     @disable_stdouts
     def test_invalid_loglevel_gives_error(self):
-        x = WorkerCommand(app=Celery(set_as_current=False))
+        x = worker(app=Celery(set_as_current=False))
         with self.assertRaises(SystemExit):
             x.run(loglevel='GRIM_REAPER')
 
     def test_no_loglevel(self):
         app = Celery(set_as_current=False)
         app.Worker = Mock()
-        WorkerCommand(app=app).run(loglevel=None)
+        worker(app=app).run(loglevel=None)
 
     def test_tasklist(self):
         celery = Celery(set_as_current=False)
@@ -266,7 +266,7 @@ class test_Worker(WorkerAppCase):
     @disable_stdouts
     def test_unknown_loglevel(self):
         with self.assertRaises(SystemExit):
-            WorkerCommand(app=self.app).run(loglevel='ALIEN')
+            worker(app=self.app).run(loglevel='ALIEN')
         worker1 = self.Worker(loglevel=0xFFFF)
         self.assertEqual(worker1.loglevel, 0xFFFF)
 
@@ -403,7 +403,7 @@ class test_funs(WorkerAppCase):
 
     @disable_stdouts
     def test_parse_options(self):
-        cmd = WorkerCommand()
+        cmd = worker()
         cmd.app = current_app
         opts, args = cmd.parse_options('worker', ['--concurrency=512'])
         self.assertEqual(opts.concurrency, 512)

+ 0 - 16
celery/tests/utilities/test_imports.py

@@ -4,7 +4,6 @@ from mock import Mock, patch
 
 from celery.utils.imports import (
     qualname,
-    symbol_by_name,
     reload_from_cwd,
     module_file,
     find_module,
@@ -28,21 +27,6 @@ class test_import_utils(Case):
         self.assertEqual(qualname(Class), 'quick.brown.Fox')
         self.assertEqual(qualname(Class()), 'quick.brown.Fox')
 
-    def test_symbol_by_name__instance_returns_instance(self):
-        instance = object()
-        self.assertIs(symbol_by_name(instance), instance)
-
-    def test_symbol_by_name_returns_default(self):
-        default = object()
-        self.assertIs(symbol_by_name('xyz.ryx.qedoa.weq:foz',
-                        default=default), default)
-
-    def test_symbol_by_name_package(self):
-        from celery.worker import WorkController
-        self.assertIs(symbol_by_name('.worker:WorkController',
-                    package='celery'), WorkController)
-        self.assertTrue(symbol_by_name(':group', package='celery'))
-
     @patch('celery.utils.imports.reload')
     def test_reload_from_cwd(self, reload):
         reload_from_cwd('foo')

+ 4 - 10
celery/tests/worker/test_control.py

@@ -34,6 +34,9 @@ def mytask():
 class WorkController(object):
     autoscaler = None
 
+    def stats(self):
+        return {'total': state.total_count}
+
 
 class Consumer(consumer.Consumer):
 
@@ -48,10 +51,6 @@ class Consumer(consumer.Consumer):
         from celery.concurrency.base import BasePool
         self.pool = BasePool(10)
 
-    @property
-    def info(self):
-        return {'xyz': 'XYZ'}
-
 
 class test_ControlPanel(Case):
 
@@ -139,13 +138,8 @@ class test_ControlPanel(Case):
     def test_stats(self):
         prev_count, state.total_count = state.total_count, 100
         try:
-            self.assertDictContainsSubset({'total': 100,
-                                           'consumer': {'xyz': 'XYZ'}},
+            self.assertDictContainsSubset({'total': 100},
                                           self.panel.handle('stats'))
-            self.panel.state.consumer = Mock()
-            self.panel.handle('stats')
-            self.assertTrue(
-                self.panel.state.consumer.controller.autoscaler.info.called)
         finally:
             state.total_count = prev_count
 

+ 7 - 5
celery/tests/worker/test_worker.py

@@ -246,12 +246,14 @@ class test_Consumer(Case):
         l = MyKombuConsumer(self.ready_queue, timer=self.timer)
         l.task_consumer = Mock()
         l.qos = QoS(l.task_consumer.qos, 10)
-        info = l.info
+        l.connection = Mock()
+        l.connection.info.return_value = {'foo': 'bar'}
+        l.controller = l.app.WorkController()
+        l.controller.pool = Mock()
+        l.controller.pool.info.return_value = [Mock(), Mock()]
+        l.controller.consumer = l
+        info = l.controller.stats()
         self.assertEqual(info['prefetch_count'], 10)
-        self.assertFalse(info['broker'])
-
-        l.connection = current_app.connection()
-        info = l.info
         self.assertTrue(info['broker'])
 
     def test_start_when_closed(self):

+ 1 - 1
celery/worker/consumer.py

@@ -391,7 +391,7 @@ class Connection(bootsteps.StartStopStep):
     def info(self, c):
         info = c.connection.info()
         info.pop('password', None)  # don't send password.
-        return info
+        return {'broker': info}
 
 
 class Events(bootsteps.StartStopStep):