فهرست منبع

Fixes autoreload tests when kqueue missing

Ask Solem 13 سال پیش
والد
کامیت
2299188e8a
1فایلهای تغییر یافته به همراه8 افزوده شده و 3 حذف شده
  1. 8 3
      celery/tests/worker/test_autoreload.py

+ 8 - 3
celery/tests/worker/test_autoreload.py

@@ -108,7 +108,7 @@ class test_KQueueMontior(Case):
     def test_start(self, osopen, kevent, kqueue):
         prev = {}
         flags = ["KQ_FILTER_VNODE", "KQ_EV_ADD", "KQ_EV_ENABLE",
-                 "KQ_EV_CLEAR", "KW_NOTE_WRITE", "KQ_NOTE_EXTEND"]
+                 "KQ_EV_CLEAR", "KQ_NOTE_WRITE", "KQ_NOTE_EXTEND"]
         for i, flag in enumerate(flags):
             prev[flag] = getattr(select, flag, None)
             if not prev[flag]:
@@ -133,7 +133,9 @@ class test_KQueueMontior(Case):
             x.start()
         finally:
             for flag in flags:
-                if not prev[flag]:
+                if prev[flag]:
+                    setattr(select, flag, prev[flag])
+                else:
                     delattr(select, flag)
 
 
@@ -167,7 +169,10 @@ class test_default_implementation(Case):
     @patch("celery.worker.autoreload.pyinotify")
     def test_inotify(self, pyinotify):
         kq = getattr(select, "kqueue", None)
-        delattr(select, "kqueue")
+        try:
+            delattr(select, "kqueue")
+        except AttributeError:
+            pass
         platform, sys.platform = sys.platform, "linux"
         try:
             self.assertEqual(default_implementation(), "inotify")