Browse Source

unittests: Don't leave threads running at teardown.

Ask Solem 14 năm trước cách đây
mục cha
commit
374be0fbea

+ 0 - 2
celery/concurrency/processes/__init__.py

@@ -151,8 +151,6 @@ class TaskPool(object):
             self.logger.error("Pool callback raised exception: %s" % (
                 traceback.format_exc(), ))
 
-
-
     @property
     def info(self):
         return {"max-concurrency": self.limit,

+ 0 - 1
celery/conf.py

@@ -1,4 +1,3 @@
-<<<<<<< HEAD
 """
 
 **DEPRECATED**

+ 8 - 0
celery/tests/__init__.py

@@ -1,4 +1,5 @@
 import os
+import sys
 
 config = os.environ.setdefault("CELERY_TEST_CONFIG_MODULE",
                                "celery.tests.config")
@@ -8,6 +9,13 @@ os.environ["CELERY_LOADER"] = "default"
 
 
 def teardown():
+    import threading
     import os
     if os.path.exists("test.db"):
         os.remove("test.db")
+    remaining_threads = [thread for thread in threading.enumerate()
+                            if thread.name != "MainThread"]
+    if remaining_threads:
+        sys.stderr.write(
+            "\n\n**WARNING**: Remaning threads at teardown: %r...\n" % (
+                remaining_threads))

+ 34 - 15
celery/tests/test_worker.py

@@ -26,6 +26,12 @@ class PlaceHolder(object):
         pass
 
 
+class MyCarrotListener(CarrotListener):
+
+    def restart_heartbeat(self):
+        self.heart = None
+
+
 class MockControlDispatch(object):
     commands = []
 
@@ -36,6 +42,7 @@ class MockControlDispatch(object):
 class MockEventDispatcher(object):
     sent = []
     closed = False
+    flushed = False
 
     def send(self, event, *args, **kwargs):
         self.sent.append(event)
@@ -43,6 +50,9 @@ class MockEventDispatcher(object):
     def close(self):
         self.closed = True
 
+    def flush(self):
+        self.flushed = True
+
 
 class MockHeart(object):
     closed = False
@@ -163,8 +173,11 @@ class test_CarrotListener(unittest.TestCase):
         self.logger = get_logger()
         self.logger.setLevel(0)
 
+    def tearDown(self):
+        self.eta_schedule.stop()
+
     def test_mainloop(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
 
         class MockConnection(object):
@@ -205,7 +218,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertTrue(records.get("consumer_add"))
 
     def test_connection(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
 
         l.reset_connection()
@@ -217,6 +230,7 @@ class test_CarrotListener(unittest.TestCase):
 
         l.reset_connection()
         self.assertIsInstance(l.connection, BrokerConnection)
+        l.stop_consumers()
 
         l.stop()
         l.close_connection()
@@ -224,7 +238,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertIsNone(l.task_consumer)
 
     def test_receive_message_control_command(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, control={"command": "shutdown"})
@@ -234,12 +248,12 @@ class test_CarrotListener(unittest.TestCase):
         self.assertIn("shutdown", l.control_dispatch.commands)
 
     def test_close_connection(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         l._state = RUN
         l.close_connection()
 
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         eventer = l.event_dispatcher = MockEventDispatcher()
         heart = l.heart = MockHeart()
@@ -249,7 +263,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertTrue(heart.closed)
 
     def test_receive_message_unknown(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, unknown={"baz": "!!!"})
@@ -266,7 +280,7 @@ class test_CarrotListener(unittest.TestCase):
 
     def test_receive_message_InvalidTaskError(self):
         logger = MockLogger()
-        l = CarrotListener(self.ready_queue, self.eta_schedule, logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
@@ -279,7 +293,7 @@ class test_CarrotListener(unittest.TestCase):
 
     def test_on_decode_error(self):
         logger = MockLogger()
-        l = CarrotListener(self.ready_queue, self.eta_schedule, logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, logger,
                            send_events=False)
 
         class MockMessage(object):
@@ -297,7 +311,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertIn("Message decoding error", logger.logged[0])
 
     def test_receieve_message(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
@@ -320,16 +334,16 @@ class test_CarrotListener(unittest.TestCase):
             def qos(self, **kwargs):
                 self.prefetch_count_incremented = True
 
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
                            eta=datetime.now().isoformat(),
                            args=[2, 4, 8], kwargs={})
 
-        l.event_dispatcher = MockEventDispatcher()
         l.task_consumer = MockConsumer()
         l.qos = QoS(l.task_consumer, l.initial_prefetch_count, l.logger)
+        l.event_dispatcher = MockEventDispatcher()
         l.receive_message(m.decode(), m)
 
         items = [entry[2] for entry in self.eta_schedule.queue]
@@ -339,10 +353,11 @@ class test_CarrotListener(unittest.TestCase):
                 found = True
         self.assertTrue(found)
         self.assertTrue(l.task_consumer.prefetch_count_incremented)
+        l.eta_schedule.stop()
 
     def test_revoke(self):
         ready_queue = FastQueue()
-        l = CarrotListener(ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(ready_queue, self.eta_schedule, self.logger,
                            send_events=False)
         backend = MockBackend()
         id = gen_unique_id()
@@ -359,7 +374,7 @@ class test_CarrotListener(unittest.TestCase):
         self.assertTrue(ready_queue.empty())
 
     def test_receieve_message_not_registered(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                           send_events=False)
         backend = MockBackend()
         m = create_message(backend, task="x.X.31x", args=[2, 4, 8], kwargs={})
@@ -370,8 +385,9 @@ class test_CarrotListener(unittest.TestCase):
         self.assertTrue(self.eta_schedule.empty())
 
     def test_receieve_message_eta(self):
-        l = CarrotListener(self.ready_queue, self.eta_schedule, self.logger,
+        l = MyCarrotListener(self.ready_queue, self.eta_schedule, self.logger,
                           send_events=False)
+        dispatcher = l.event_dispatcher = MockEventDispatcher()
         backend = MockBackend()
         m = create_message(backend, task=foo_task.name,
                            args=[2, 4, 8], kwargs={},
@@ -385,6 +401,9 @@ class test_CarrotListener(unittest.TestCase):
             l.reset_connection()
         finally:
             l.app.conf.BROKER_CONNECTION_RETRY = p
+        l.stop_consumers()
+        self.assertTrue(dispatcher.flushed)
+        l.event_dispatcher = MockEventDispatcher()
         l.receive_message(m.decode(), m)
 
         in_hold = self.eta_schedule.queue[0]
@@ -405,7 +424,7 @@ class test_CarrotListener(unittest.TestCase):
             def update(self):
                 self.prev = self.next
 
-        class _Listener(CarrotListener):
+        class _Listener(MyCarrotListener):
             iterations = 0
             wait_method = None
 

+ 0 - 2
celery/worker/controllers.py

@@ -56,8 +56,6 @@ class Mediator(threading.Thread):
             self.logger.error("Mediator callback raised exception %r\n%s" % (
                 exc, traceback.format_exc()))
 
-
-
     def run(self):
         while not self._shutdown.isSet():
             self.move()

+ 5 - 2
celery/worker/listener.py

@@ -402,11 +402,14 @@ class CarrotListener(object):
                                                 app=self.app,
                                                 hostname=self.hostname,
                                                 enabled=self.send_events)
-        self.heart = Heart(self.event_dispatcher)
-        self.heart.start()
+        self.restart_heartbeat()
 
         self._state = RUN
 
+    def restart_heartbeat(self):
+        self.heart = Heart(self.event_dispatcher)
+        self.heart.start()
+
     def _mainloop(self, **kwargs):
         while 1:
             yield self.connection.drain_events()