Przeglądaj źródła

100% coverage for celery.worker.heartbeat

Ask Solem 12 lat temu
rodzic
commit
7e079dac48
1 zmienionych plików z 14 dodań i 13 usunięć
  1. 14 13
      celery/tests/worker/test_heartbeat.py

+ 14 - 13
celery/tests/worker/test_heartbeat.py

@@ -47,7 +47,7 @@ class MockTimer(object):
 
 
 class test_Heart(Case):
 class test_Heart(Case):
 
 
-    def test_stop(self):
+    def test_start_stop(self):
         timer = MockTimer()
         timer = MockTimer()
         eventer = MockDispatcher()
         eventer = MockDispatcher()
         h = Heart(timer, eventer, interval=1)
         h = Heart(timer, eventer, interval=1)
@@ -57,16 +57,17 @@ class test_Heart(Case):
         self.assertIsNone(h.tref)
         self.assertIsNone(h.tref)
         h.stop()
         h.stop()
 
 
-    @sleepdeprived
-    def test_run_manages_cycle(self):
+    def test_start_when_disabled(self):
+        timer = MockTimer()
+        eventer = MockDispatcher()
+        eventer.enabled = False
+        h = Heart(timer, eventer)
+        h.start()
+        self.assertFalse(h.tref)
+
+    def test_stop_when_disabled(self):
+        timer = MockTimer()
         eventer = MockDispatcher()
         eventer = MockDispatcher()
-        heart = Heart(MockTimer(), eventer, interval=0.1)
-        eventer.heart = heart
-        heart.start()
-        msecs, fun, args, kwargs = tref = heart.tref
-        self.assertEqual(msecs, 0.1 * 1000)
-        self.assertEqual(tref.fun, eventer.send)
-        self.assertTrue(tref.args)
-        self.assertTrue(tref.kwargs)
-        heart.stop()
-        self.assertTrue(tref.cancelled)
+        eventer.enabled = False
+        h = Heart(timer, eventer)
+        h.stop()