Browse Source

Tests passing

Ask Solem 13 years ago
parent
commit
af79860464
3 changed files with 30 additions and 6 deletions
  1. 2 0
      Changelog
  2. 26 5
      celery/tests/test_worker/__init__.py
  3. 2 1
      celery/worker/__init__.py

+ 2 - 0
Changelog

@@ -49,6 +49,8 @@ Important Notes
 
         CELERY_TASK_RESULT_EXPIRES = None
 
+* Eventlet: Fixed problem with shutdown (Issue #457).
+
 * Broker transports can be now be specified using URLs
 
     The broker can now be specified as an URL instead.

+ 26 - 5
celery/tests/test_worker/__init__.py

@@ -598,6 +598,18 @@ class test_Consumer(unittest.TestCase):
                           send_events=False)
         l.pidbox_node = Mock()
 
+        class BConsumer(Mock):
+
+            def __enter__(self):
+                self.consume()
+                return self
+
+            def __exit__(self, *exc_info):
+                self.cancel()
+
+
+        l.pidbox_node.listen = BConsumer()
+
         connections = []
 
         class Connection(object):
@@ -605,13 +617,21 @@ class test_Consumer(unittest.TestCase):
             def __init__(self, obj):
                 connections.append(self)
                 self.obj = obj
+                self.default_channel = self.channel()
                 self.closed = False
 
+            def __enter__(self):
+                return self
+
+            def __exit__(self, *exc_info):
+                self.close()
+
             def channel(self):
                 return Mock()
 
-            def drain_events(self):
+            def drain_events(self, **kwargs):
                 self.obj.connection = None
+                self.obj._pidbox_node_shutdown.set()
 
             def close(self):
                 self.closed = True
@@ -693,6 +713,7 @@ class test_WorkController(AppCase):
 
     def create_worker(self, **kw):
         worker = WorkController(concurrency=1, loglevel=0, **kw)
+        worker._shutdown_complete.set()
         worker.logger = Mock()
         return worker
 
@@ -841,8 +862,7 @@ class test_WorkController(AppCase):
         stc = Mock()
         stc.start.side_effect = SystemTerminate()
         worker1.components = [stc]
-        with self.assertRaises(SystemExit):
-            worker1.start()
+        worker1.start()
         self.assertTrue(stc.terminate.call_count)
 
         worker2 = self.create_worker()
@@ -850,8 +870,7 @@ class test_WorkController(AppCase):
         sec.start.side_effect = SystemExit()
         sec.terminate = None
         worker2.components = [sec]
-        with self.assertRaises(SystemExit):
-            worker2.start()
+        worker2.start()
         self.assertTrue(sec.stop.call_count)
 
     def test_state_db(self):
@@ -874,6 +893,7 @@ class test_WorkController(AppCase):
 
     def test_start__stop(self):
         worker = self.worker
+        worker._shutdown_complete.set()
         worker.components = [Mock(), Mock(), Mock(), Mock()]
 
         worker.start()
@@ -885,6 +905,7 @@ class test_WorkController(AppCase):
 
     def test_start__terminate(self):
         worker = self.worker
+        worker._shutdown_complete.set()
         worker.components = [Mock(), Mock(), Mock(), Mock(), Mock()]
         for component in worker.components[:3]:
             component.terminate = None

+ 2 - 1
celery/worker/__init__.py

@@ -267,6 +267,7 @@ class WorkController(object):
                 self._running = i + 1
                 blocking(component.start)
         except SystemTerminate:
+            print("GOT TERMINATE")
             self.terminate()
         except:
             self.stop()
@@ -286,7 +287,7 @@ class WorkController(object):
                                  exc_info=True)
         except SystemTerminate:
             self.terminate()
-            sys.exit()
+            raise
         except BaseException, exc:
             self.stop()
             raise exc