Browse Source

100% Coverage for celery.apps.worker

Ask Solem 14 years ago
parent
commit
96729a39f3
1 changed files with 148 additions and 0 deletions
  1. 148 0
      celery/tests/test_bin/test_celeryd.py

+ 148 - 0
celery/tests/test_bin/test_celeryd.py

@@ -152,6 +152,148 @@ class test_Worker(unittest.TestCase):
         finally:
         finally:
             app.amqp.queues = p
             app.amqp.queues = p
 
 
+    @disable_stdouts
+    def test_autoscale_argument(self):
+        worker1 = self.Worker(autoscale="10,3")
+        self.assertListEqual(worker1.autoscale, [10, 3])
+        worker2 = self.Worker(autoscale="10")
+        self.assertListEqual(worker2.autoscale, [10, 0])
+
+    @disable_stdouts
+    def test_include_argument(self):
+        worker1 = self.Worker(include="some.module")
+        self.assertListEqual(worker1.include, ["some.module"])
+        worker2 = self.Worker(include="some.module,another.package")
+        self.assertListEqual(worker2.include, ["some.module",
+                                               "another.package"])
+        worker3 = self.Worker(include="os,sys")
+        worker3.init_loader()
+
+    @disable_stdouts
+    def test_unknown_loglevel(self):
+        self.assertRaises(SystemExit, self.Worker, loglevel="ALIEN")
+        worker1 = self.Worker(loglevel=0xFFFF)
+        self.assertEqual(worker1.loglevel, 0xFFFF)
+
+    @disable_stdouts
+    def test_warns_if_running_as_privileged_user(self):
+
+        def geteuid():
+            return 0
+
+        prev, os.geteuid = os.geteuid, geteuid
+        try:
+            def with_catch_warnings(log):
+                worker = self.Worker()
+                worker.run()
+                self.assertTrue(log)
+                self.assertIn("supervisor privileges is not encouraged",
+                              log[0].message.args[0])
+            context = catch_warnings(record=True)
+            execute_context(context, with_catch_warnings)
+        finally:
+            os.geteuid = prev
+
+    @disable_stdouts
+    def test_use_pidfile(self):
+        from celery import platforms
+
+
+        class create_pidlock(object):
+            instance = [None]
+
+            def __init__(self, file):
+                self.file = file
+                self.instance[0] = self
+
+            def acquire(self):
+                self.acquired = True
+
+                class Object(object):
+                    def release(self):
+                        pass
+
+                return Object()
+
+        prev, platforms.create_pidlock = platforms.create_pidlock, \
+                                         create_pidlock
+        try:
+            worker = self.Worker(pidfile="pidfilelockfilepid")
+            worker.run_worker()
+            self.assertTrue(create_pidlock.instance[0].acquired)
+        finally:
+            platforms.create_pidlock = prev
+
+    @disable_stdouts
+    def test_redirect_stdouts(self):
+        worker = self.Worker()
+        worker.redirect_stdouts = False
+        worker.redirect_stdouts_to_logger()
+        self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
+
+    def test_redirect_stdouts_already_handled(self):
+        from celery import signals
+
+        logging_setup = [False]
+        def on_logging_setup(**kwargs):
+            logging_setup[0] = True
+
+        signals.setup_logging.connect(on_logging_setup)
+        try:
+            worker = self.Worker()
+            worker.app.log.__class__._setup = False
+            worker.redirect_stdouts_to_logger()
+            self.assertTrue(logging_setup[0])
+            self.assertRaises(AttributeError, getattr, sys.stdout, "logger")
+        finally:
+            signals.setup_logging.disconnect(on_logging_setup)
+
+    @disable_stdouts
+    def test_platform_tweaks_osx(self):
+
+        class OSXWorker(self.Worker):
+            proxy_workaround_installed = False
+
+            def osx_proxy_detection_workaround(self):
+                self.proxy_workaround_installed = True
+
+        worker = OSXWorker()
+
+        def install_HUP_nosupport(controller):
+            controller.hup_not_supported_installed = True
+
+        class Controller(object):
+            pass
+
+        prev = cd.install_HUP_not_supported_handler
+        cd.install_HUP_not_supported_handler = install_HUP_nosupport
+        try:
+            worker.app.IS_OSX = True
+            controller = Controller()
+            worker.install_platform_tweaks(controller)
+            self.assertTrue(controller.hup_not_supported_installed)
+            self.assertTrue(worker.proxy_workaround_installed)
+        finally:
+            cd.install_HUP_not_supported_handler = prev
+
+    @disable_stdouts
+    def test_general_platform_tweaks(self):
+
+        restart_worker_handler_installed = [False]
+
+        def install_worker_restart_handler(worker):
+            restart_worker_handler_installed[0] = True
+
+        prev = cd.install_worker_restart_handler
+        cd.install_worker_restart_handler = install_worker_restart_handler
+        try:
+            worker = self.Worker()
+            worker.app.IS_OSX = False
+            worker.install_platform_tweaks(object())
+            self.assertTrue(restart_worker_handler_installed[0])
+        finally:
+            cd.install_worker_restart_handler = prev
+
     @disable_stdouts
     @disable_stdouts
     def test_on_consumer_ready(self):
     def test_on_consumer_ready(self):
         worker_ready_sent = [False]
         worker_ready_sent = [False]
@@ -268,6 +410,12 @@ class test_signal_handlers(unittest.TestCase):
         finally:
         finally:
             process.name = name
             process.name = name
 
 
+    @disable_stdouts
+    def test_install_HUP_not_supported_handler(self):
+        worker = self._Worker()
+        handlers = self.psig(cd.install_HUP_not_supported_handler, worker)
+        handlers["SIGHUP"]("SIGHUP", object())
+
     @disable_stdouts
     @disable_stdouts
     def test_worker_int_again_handler_only_stop_MainProcess(self):
     def test_worker_int_again_handler_only_stop_MainProcess(self):
         process = current_process()
         process = current_process()