Prechádzať zdrojové kódy

100% coverage for celery.worker.hub

Ask Solem 12 rokov pred
rodič
commit
432a900e60
2 zmenil súbory, kde vykonal 80 pridanie a 11 odobranie
  1. 78 0
      celery/tests/worker/test_hub.py
  2. 2 11
      celery/worker/hub.py

+ 78 - 0
celery/tests/worker/test_hub.py

@@ -4,6 +4,9 @@ from celery.worker.hub import (
     DummyLock,
     BoundedSemaphore,
     Hub,
+    repr_flag,
+    _rcb,
+    READ, WRITE, ERR
 )
 
 from mock import Mock, call, patch
@@ -121,6 +124,23 @@ class test_BoundedSemaphore(Case):
 
 class test_Hub(Case):
 
+    def test_repr_flag(self):
+        self.assertEqual(repr_flag(READ), 'R')
+        self.assertEqual(repr_flag(WRITE), 'W')
+        self.assertEqual(repr_flag(ERR), '!')
+        self.assertEqual(repr_flag(READ | WRITE), 'RW')
+        self.assertEqual(repr_flag(READ | ERR), 'R!')
+        self.assertEqual(repr_flag(WRITE | ERR), 'W!')
+        self.assertEqual(repr_flag(READ | WRITE | ERR), 'RW!')
+
+    def test_repr_callback_rcb(self):
+
+        def f():
+            pass
+
+        self.assertEqual(_rcb(f), f.__name__)
+        self.assertEqual(_rcb('foo'), 'foo')
+
     @patch('kombu.utils.eventio.poll')
     def test_start_stop(self, poll):
         hub = Hub()
@@ -177,6 +197,64 @@ class test_Hub(Case):
         self.assertEqual(hub.fire_timers(max_timers=10), 3.982)
         keep[0].assert_called_with()
 
+    def test_fire_timers_raises(self):
+        hub = Hub()
+        eback = Mock()
+        eback.side_effect = KeyError('foo')
+        hub.timer = Mock()
+        hub.scheduler = iter([(0, eback)])
+        with self.assertRaises(KeyError):
+            hub.fire_timers(propagate=(KeyError, ))
+
+        eback.side_effect = ValueError('foo')
+        hub.scheduler = iter([(0, eback)])
+        with patch('celery.worker.hub.logger') as logger:
+            with self.assertRaises(StopIteration):
+                hub.fire_timers()
+            self.assertTrue(logger.error.called)
+
+    def test_add_raises_ValueError(self):
+        hub = Hub()
+        hub._add = Mock()
+        hub._add.side_effect = ValueError()
+        hub._discard = Mock()
+        hub.add([2], Mock(), READ)
+        hub._discard.assert_called_with(2)
+
+    def test_repr_active(self):
+        hub = Hub()
+        hub.readers = {1: Mock(), 2: Mock()}
+        hub.writers = {3: Mock(), 4: Mock()}
+        for value in hub.readers.values() + hub.writers.values():
+            value.__name__ = 'mock'
+        self.assertTrue(hub.repr_active())
+
+    def test_repr_events(self):
+        hub = Hub()
+        hub.readers = {6: Mock(), 7: Mock(), 8: Mock()}
+        hub.writers = {9: Mock()}
+        for value in hub.readers.values() + hub.writers.values():
+            value.__name__ = 'mock'
+        self.assertTrue(hub.repr_events([
+            (6, READ),
+            (7, ERR),
+            (8, READ|ERR),
+            (9, WRITE),
+            (10, 13213),
+        ]))
+
+    def test_callback_for(self):
+        hub = Hub()
+        reader, writer = Mock(), Mock()
+        hub.readers = {6: reader}
+        hub.writers = {7: writer}
+
+        self.assertEqual(hub._callback_for(6, READ), reader)
+        self.assertEqual(hub._callback_for(7, WRITE), writer)
+        with self.assertRaises(KeyError):
+            hub._callback_for(6, WRITE)
+        self.assertEqual(hub._callback_for(6, WRITE, 'foo'), 'foo')
+
     def test_update_readers(self):
         hub = Hub()
         P = hub.poller = Mock()

+ 2 - 11
celery/worker/hub.py

@@ -30,22 +30,13 @@ def repr_flag(flag):
 
 
 def _rcb(obj):
+    if obj is None:
+        return '<missing>'
     if isinstance(obj, str):
         return obj
     return obj.__name__
 
 
-def coroutine(gen):
-
-    @wraps(gen)
-    def advances(*args, **kwargs):
-        it = gen(*args, **kwargs)
-        next(it)
-        return it
-
-    return advances
-
-
 class BoundedSemaphore(object):
     """Asynchronous Bounded Semaphore.