Browse Source

Autoscale/grow: Fixes prefetch count updates

Ask Solem 11 years ago
parent
commit
79b3b1877c

+ 2 - 2
celery/tests/worker/test_autoscale.py

@@ -105,14 +105,14 @@ class test_Autoscaler(AppCase):
         x.body()
         x.body()
         self.assertEqual(x.pool.num_processes, 10)
-        self.assertTrue(worker.consumer.increment_prefetch_count.called)
+        self.assertTrue(worker.consumer._update_prefetch_count.called)
         state.reserved_requests.clear()
         x.body()
         self.assertEqual(x.pool.num_processes, 10)
         x._last_action = monotonic() - 10000
         x.body()
         self.assertEqual(x.pool.num_processes, 3)
-        self.assertTrue(worker.consumer.decrement_prefetch_count.called)
+        self.assertTrue(worker.consumer._update_prefetch_count.called)
 
     def test_run(self):
 

+ 2 - 2
celery/worker/autoscale.py

@@ -135,7 +135,7 @@ class Autoscaler(bgThread):
     def _grow(self, n):
         info('Scaling up %s processes.', n)
         self.pool.grow(n)
-        self.worker.consumer.increment_prefetch_count(n, True)
+        self.worker.consumer._update_prefetch_count(n)
 
     def _shrink(self, n):
         info('Scaling down %s processes.', n)
@@ -145,7 +145,7 @@ class Autoscaler(bgThread):
             debug("Autoscaler won't scale down: all processes busy.")
         except Exception as exc:
             error('Autoscaler: scale_down: %r', exc, exc_info=True)
-        self.worker.consumer.decrement_prefetch_count(n, True)
+        self.worker.consumer._update_prefetch_count(-n)
 
     def info(self):
         return {'max': self.max_concurrency,

+ 21 - 30
celery/worker/consumer.py

@@ -226,40 +226,31 @@ class Consumer(object):
             (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
         )
 
-    def increment_prefetch_count(self, n=1, use_multiplier=False):
-        """Increase the prefetch count by ``n``.
+    def _update_prefetch_count(self, index=0):
+        """Update prefetch count after pool/shrink grow operations.
 
-        This will also increase the initial value so it'll persist between
-        consumer restarts.  If you want the change to be temporary,
-        you can use ``self.qos.increment_eventually(n)`` instead.
+        Index must be the change in number of processes as a postive
+        (increasing) or negative (decreasing) number.
 
-        :keyword use_multiplier: If True the value will be multiplied
-            using the current prefetch multiplier setting.
+        .. note::
+
+            Currently pool grow operations will end up with an offset
+            of +1 if the initial size of the pool was 0 (e.g.
+            ``--autoscale=1,0``).
 
         """
-        n = n * self.prefetch_multiplier if use_multiplier else n
-        # initial value must be changed for consumer restart.
-        if self.initial_prefetch_count:
-            # only increase if prefetch enabled (>0)
-            self.initial_prefetch_count += n
-        self.qos.increment_eventually(n)
-
-    def decrement_prefetch_count(self, n=1, use_multiplier=False):
-        """Decrease prefetch count by ``n``.
-
-        This will also decrease the initial value so it'll persist between
-        consumer restarts.  If you want the change to be temporary,
-        you can use ``self.qos.decrement_eventually(n)`` instead.
-
-        :keyword use_multiplier: If True the value will be multiplied
-            using the current prefetch multiplier setting.
-        """
-        n = n * self.prefetch_multiplier if use_multiplier else n
-        initial = self.initial_prefetch_count
-        if initial:  # was not disabled (>0)
-            # must not get lower than 1, since that will disable the limit.
-            self.initial_prefetch_count = max(initial - n, 1)
-        self.qos.decrement_eventually(n)
+        num_processes = self.pool.num_processes
+        if not self.initial_prefetch_count or not num_processes:
+            return  # prefetch disabled
+        self.initial_prefetch_count = (
+            self.pool.num_processes * self.prefetch_multiplier
+        )
+        return self._update_qos_eventually(index)
+
+    def _update_qos_eventually(self, index):
+        return (self.qos.decrement_eventually if index < 0
+                else self.qos.increment_eventually(
+                    abs(index) * self.prefetch_multiplier))
 
     def _limit_task(self, request, bucket, tokens):
         if not bucket.can_consume(tokens):

+ 2 - 2
celery/worker/control.py

@@ -300,7 +300,7 @@ def pool_grow(state, n=1, **kwargs):
         state.consumer.controller.autoscaler.force_scale_up(n)
     else:
         state.consumer.pool.grow(n)
-        state.consumer.increment_prefetch_count(n, True)
+        state.consumer._update_prefetch_count(n)
     return {'ok': 'pool will grow'}
 
 
@@ -310,7 +310,7 @@ def pool_shrink(state, n=1, **kwargs):
         state.consumer.controller.autoscaler.force_scale_down(n)
     else:
         state.consumer.pool.shrink(n)
-        state.consumer.decrement_prefetch_count(n, True)
+        state.consumer._update_prefetch_count(-n)
     return {'ok': 'pool will shrink'}