Forráskód Böngészése

Tests: Use number of threads at startup to verify that tests join threads

Ask Solem 9 éve
szülő
commit
b3e5ebe6e7
1 módosított fájl, 12 hozzáadás és 4 törlés
  1. 12 4
      celery/tests/case.py

+ 12 - 4
celery/tests/case.py

@@ -303,6 +303,10 @@ class _AssertWarnsContext(_AssertRaisesBaseContext):
             raise self.failureException('%s not triggered' % exc_name)
 
 
+def alive_threads():
+    return [thread for thread in threading.enumerate() if thread.is_alive()]
+
+
 class Case(unittest.TestCase):
 
     def assertWarns(self, expected_warning):
@@ -391,6 +395,7 @@ def depends_on_current_app(fun):
 
 class AppCase(Case):
     contained = True
+    _threads_at_startup = [None]
 
     def __init__(self, *args, **kwargs):
         super(AppCase, self).__init__(*args, **kwargs)
@@ -406,8 +411,13 @@ class AppCase(Case):
     def Celery(self, *args, **kwargs):
         return UnitApp(*args, **kwargs)
 
+    def threads_at_startup(self):
+        if self._threads_at_startup[0] is None:
+            self._threads_at_startup[0] = alive_threads()
+        return self._threads_at_startup[0]
+
     def setUp(self):
-        self._threads_at_setup = list(threading.enumerate())
+        self._threads_at_setup = self.threads_at_startup()
         from celery import _state
         from celery import result
         result.task_join_will_block = \
@@ -463,9 +473,7 @@ class AppCase(Case):
         if self.app is not self._current_app:
             self.app.close()
         self.app = None
-        self.assertEqual(
-            self._threads_at_setup, list(threading.enumerate()),
-        )
+        self.assertEqual(self._threads_at_setup, alive_threads())
 
         # Make sure no test left the shutdown flags enabled.
         from celery.worker import state as worker_state