Browse Source

Tests passing

Ask Solem 13 years ago
parent
commit
ca92152c82

+ 0 - 1
celery/concurrency/base.py

@@ -73,7 +73,6 @@ class BasePool(object):
                 "%s does not implement restart" % (self.__class__, ))
 
     def stop(self):
-        self.close()
         self.on_stop()
         self._state = self.TERMINATE
 

+ 37 - 23
celery/tests/bin/test_celeryd.py

@@ -19,8 +19,9 @@ from celery import signals
 from celery import current_app
 from celery.apps import worker as cd
 from celery.bin.celeryd import WorkerCommand, main as celeryd_main
-from celery.exceptions import ImproperlyConfigured, SystemTerminate
+from celery.exceptions import ImproperlyConfigured
 from celery.utils.log import ensure_process_aware_logger
+from celery.worker import state
 
 from celery.tests.utils import AppCase, WhateverIO
 
@@ -393,6 +394,8 @@ class test_signal_handlers(AppCase):
         worker = self._Worker()
         handlers = self.psig(cd.install_worker_int_handler, worker)
         next_handlers = {}
+        state.should_stop = False
+        state.should_terminate = False
 
         class Signals(platforms.Signals):
 
@@ -401,15 +404,17 @@ class test_signal_handlers(AppCase):
 
         p, platforms.signals = platforms.signals, Signals()
         try:
-            with self.assertRaises(SystemExit):
-                handlers["SIGINT"]("SIGINT", object())
-            self.assertTrue(worker.stopped)
+            handlers["SIGINT"]("SIGINT", object())
+            self.assertTrue(state.should_stop)
         finally:
             platforms.signals = p
+            state.should_stop = False
 
-        with self.assertRaises(SystemExit):
+        try:
             next_handlers["SIGINT"]("SIGINT", object())
-        self.assertTrue(worker.terminated)
+            self.assertTrue(state.should_terminate)
+        finally:
+            state.should_terminate = False
 
     @disable_stdouts
     def test_worker_int_handler_only_stop_MainProcess(self):
@@ -422,11 +427,11 @@ class test_signal_handlers(AppCase):
         try:
             worker = self._Worker()
             handlers = self.psig(cd.install_worker_int_handler, worker)
-            with self.assertRaises(SystemExit):
-                handlers["SIGINT"]("SIGINT", object())
-            self.assertFalse(worker.stopped)
+            handlers["SIGINT"]("SIGINT", object())
+            self.assertTrue(state.should_stop)
         finally:
             process.name = name
+            state.should_stop = False
 
     @disable_stdouts
     def test_install_HUP_not_supported_handler(self):
@@ -445,9 +450,11 @@ class test_signal_handlers(AppCase):
         try:
             worker = self._Worker()
             handlers = self.psig(cd.install_worker_term_hard_handler, worker)
-            with self.assertRaises(SystemExit):
+            try:
                 handlers["SIGQUIT"]("SIGQUIT", object())
-            self.assertFalse(worker.terminated)
+                self.assertTrue(state.should_terminate)
+            finally:
+                state.should_terminate = False
         finally:
             process.name = name
 
@@ -455,12 +462,14 @@ class test_signal_handlers(AppCase):
     def test_worker_term_handler(self):
         worker = self._Worker()
         handlers = self.psig(cd.install_worker_term_handler, worker)
-        with self.assertRaises(SystemExit):
+        try:
             handlers["SIGTERM"]("SIGTERM", object())
-        self.assertTrue(worker.stopped)
+            self.assertTrue(state.should_stop)
+        finally:
+            state.should_stop = False
 
-    @patch("celery.apps.worker.logger")
-    def test_worker_cry_handler(self, logger):
+    @patch("sys.__stderr__")
+    def test_worker_cry_handler(self, stderr):
         if sys.platform.startswith("java"):
             raise SkipTest("Cry handler does not work on Jython")
         if hasattr(sys, "pypy_version_info"):
@@ -468,7 +477,7 @@ class test_signal_handlers(AppCase):
         if sys.version_info > (2, 5):
             handlers = self.psig(cd.install_cry_handler)
             self.assertIsNone(handlers["SIGUSR1"]("SIGUSR1", object()))
-            self.assertTrue(logger.error.called)
+            self.assertTrue(stderr.write.called)
         else:
             raise SkipTest("Needs Python 2.5 or later")
 
@@ -483,14 +492,16 @@ class test_signal_handlers(AppCase):
         try:
             worker = self._Worker()
             handlers = self.psig(cd.install_worker_term_handler, worker)
-            with self.assertRaises(SystemExit):
-                handlers["SIGTERM"]("SIGTERM", object())
-            self.assertFalse(worker.stopped)
+            handlers["SIGTERM"]("SIGTERM", object())
+            self.assertTrue(state.should_stop)
         finally:
             process.name = name
+            state.should_stop = False
 
     @disable_stdouts
-    def test_worker_restart_handler(self):
+    @patch("os.fork")
+    def test_worker_restart_handler(self, fork):
+        fork.return_value = 0
         if getattr(os, "execv", None) is None:
             raise SkipTest("platform does not have excv")
         argv = []
@@ -503,15 +514,18 @@ class test_signal_handlers(AppCase):
             worker = self._Worker()
             handlers = self.psig(cd.install_worker_restart_handler, worker)
             handlers["SIGHUP"]("SIGHUP", object())
-            self.assertTrue(worker.stopped)
+            self.assertTrue(state.should_stop)
             self.assertTrue(argv)
         finally:
             os.execv = execv
+            state.should_stop = False
 
     @disable_stdouts
     def test_worker_term_hard_handler(self):
         worker = self._Worker()
         handlers = self.psig(cd.install_worker_term_hard_handler, worker)
-        with self.assertRaises(SystemTerminate):
+        try:
             handlers["SIGQUIT"]("SIGQUIT", object())
-        self.assertTrue(worker.terminated)
+            self.assertTrue(state.should_terminate)
+        finally:
+            state.should_terminate = False

+ 1 - 0
celery/tests/concurrency/test_processes.py

@@ -119,6 +119,7 @@ class test_TaskPool(Case):
         pool = TaskPool(10)
         pool.start()
         self.assertTrue(pool._pool.started)
+        self.assertTrue(pool._pool._state == mp.RUN)
 
         _pool = pool._pool
         pool.stop()

+ 17 - 12
celery/utils/threads.py

@@ -49,23 +49,28 @@ class bgThread(Thread):
             del(exc_info)
 
     def run(self):
-        shutdown = self._is_shutdown
+        body = self.body
+        shutdown_set = self._is_shutdown.is_set
         try:
-            while not shutdown.is_set():
+            while not shutdown_set():
                 try:
-                    self.body()
+                    body()
                 except Exception, exc:
-                    self.on_crash("%r crashed: %r", self.name, exc)
-                    # exiting by normal means does not work here, so force exit.
-                    os._exit(1)
-            try:
-                self._is_stopped.set()
-            except TypeError:  # pragma: no cover
-                # we lost the race at interpreter shutdown,
-                # so gc collected built-in modules.
-                pass
+                    try:
+                        self.on_crash("%r crashed: %r", self.name, exc)
+                        self._set_stopped()
+                    finally:
+                        os._exit(1)  # exiting by normal means won't work
         finally:
+            self._set_stopped()
+
+    def _set_stopped(self):
+        try:
             self._is_stopped.set()
+        except TypeError:  # pragma: no cover
+            # we lost the race at interpreter shutdown,
+            # so gc collected built-in modules.
+            pass
 
     def stop(self):
         """Graceful shutdown."""

+ 3 - 0
celery/worker/hub.py

@@ -1,5 +1,8 @@
 from __future__ import absolute_import
 
+import errno
+import socket
+
 from time import sleep
 
 from kombu.utils.eventio import poll, POLL_READ, POLL_ERR