Browse Source

Fixes tests

Ask Solem 11 years ago
parent
commit
17118ebc40

+ 9 - 32
celery/tests/concurrency/test_prefork.py

@@ -94,6 +94,9 @@ class MockPool(object):
     def handle_result_event(self, *args, **kwargs):
         pass
 
+    def flush(self):
+        pass
+
     def grow(self, n=1):
         self._processes += n
 
@@ -234,44 +237,18 @@ class test_ResultHandler(PoolCase):
         )
         self.assertTrue(x)
         hub = Mock(name='hub')
-        x.register_with_event_loop(hub)
+        recv = x._recv_message = Mock(name='recv_message')
+        recv.return_value = iter([])
         x.on_state_change = Mock()
+        x.register_with_event_loop(hub)
         proc = x.fileno_to_outq[3] = Mock()
         reader = proc.outq._reader
         reader.poll.return_value = False
         x.handle_event(6)  # KeyError
         x.handle_event(3)
-        reader.poll.assert_called_with(0)
-        self.assertFalse(x.on_state_change.called)
-
-        reader.poll.reset()
-        reader.poll.return_value = True
-        task = reader.recv.return_value = (1, (2, 3))
-        x.handle_event(3)
-        reader.poll.assert_called_with(0)
-        reader.recv.assert_called_with()
-        x.on_state_change.assert_called_with(task)
-        self.assertTrue(x._it)
-
-        reader.recv.return_value = None
-        x.handle_event(3)
-        self.assertIsNone(x._it)
-
-        x._state = asynpool.TERMINATE
-        it = x._process_result()
-        next(it)
-        with self.assertRaises(asynpool.CoroStop):
-            it.send(3)
-        x.handle_event(3)
-        self.assertIsNone(x._it)
-        x._state == asynpool.RUN
-
-        reader.recv.side_effect = EOFError()
-        it = x._process_result()
-        next(it)
-        with self.assertRaises(asynpool.CoroStop):
-            it.send(3)
-        reader.recv.side_effect = None
+        x._recv_message.assert_called_with(
+            hub.add_reader, 3, x.on_state_change,
+        )
 
 
 class test_TaskPool(PoolCase):

+ 7 - 3
celery/tests/worker/test_control.py

@@ -276,20 +276,24 @@ class test_ControlPanel(AppCase):
             def shrink(self, n=1):
                 self.size -= n
 
+            @property
+            def num_processes(self):
+                return self.size
+
         consumer = Consumer(self.app)
         consumer.prefetch_multiplier = 8
         consumer.qos = Mock(name='qos')
-        consumer.pool = MockPool()
+        consumer.pool = MockPool(1)
         panel = self.create_panel(consumer=consumer)
 
         panel.handle('pool_grow')
         self.assertEqual(consumer.pool.size, 2)
         consumer.qos.increment_eventually.assert_called_with(8)
-        self.assertEqual(consumer.initial_prefetch_count, 9)
+        self.assertEqual(consumer.initial_prefetch_count, 16)
         panel.handle('pool_shrink')
         self.assertEqual(consumer.pool.size, 1)
         consumer.qos.decrement_eventually.assert_called_with(8)
-        self.assertEqual(consumer.initial_prefetch_count, 1)
+        self.assertEqual(consumer.initial_prefetch_count, 8)
 
         panel.state.consumer = Mock()
         panel.state.consumer.controller = Mock()

+ 2 - 2
celery/worker/consumer.py

@@ -249,8 +249,8 @@ class Consumer(object):
 
     def _update_qos_eventually(self, index):
         return (self.qos.decrement_eventually if index < 0
-                else self.qos.increment_eventually(
-                    abs(index) * self.prefetch_multiplier))
+                else self.qos.increment_eventually)(
+                    abs(index) * self.prefetch_multiplier)
 
     def _limit_task(self, request, bucket, tokens):
         if not bucket.can_consume(tokens):