Ask Solem 11 years ago
parent
commit
64cae834e7
2 changed files with 33 additions and 7 deletions
  1. 29 1
      celery/worker/consumer.py
  2. 4 6
      celery/worker/control.py

+ 29 - 1
celery/worker/consumer.py

@@ -165,7 +165,7 @@ class Consumer(object):
                  init_callback=noop, hostname=None,
                  pool=None, app=None,
                  timer=None, controller=None, hub=None, amqheartbeat=None,
-                 worker_options=None, disable_rate_limits=False, 
+                 worker_options=None, disable_rate_limits=False,
                  initial_prefetch_count=2, **kwargs):
         self.app = app
         self.controller = controller
@@ -225,6 +225,34 @@ class Consumer(object):
             (n, self.bucket_for_task(t)) for n, t in items(self.app.tasks)
         )
 
+    def increment_prefetch_count(self, n=1):
+        """Increase the prefetch count by ``n``.
+
+        This will also increase the initial value so it'll persist between
+        consumer restarts.  If you want the change to be temporary,
+        you can use ``self.qos.increment_eventually(n)`` instead.
+
+        """
+        # initial value must be changed for consumer restart.
+        if self.initial_prefetch_count:
+            # only increase if prefetch enabled (>0)
+            self.initial_prefetch_count += n
+        self.qos.increment_eventually(n)
+
+    def decrement_prefetch_count(self, n=1):
+        """Decrease prefetch count by ``n``.
+
+        This will also decrease the initial value so it'll persist between
+        consumer restarts.  If you want the change to be temporary,
+        you can use ``self.qos.decrement_eventually(n)`` instead.
+
+        """
+        initial = self.initial_prefetch_count
+        if initial:  # was not disabled (>0)
+            # must not get lower than 1, since that will disable the limit.
+            self.initial_prefetch_count = max(initial - n, 1)
+        self.qos.decrement_eventually(n)
+
     def _limit_task(self, request, bucket, tokens):
         if not bucket.can_consume(tokens):
             hold = bucket.expected_time(tokens)

+ 4 - 6
celery/worker/control.py

@@ -283,9 +283,8 @@ def pool_grow(state, n=1, **kwargs):
         state.consumer.controller.autoscaler.force_scale_up(n)
     else:
         state.consumer.pool.grow(n)
-    state.consumer.qos.increment_eventually(n)
-    state.consumer.initial_prefetch_count += n
-    return {'ok': 'spawned worker processes'}
+    state.consumer.increment_prefetch_count(n)
+    return {'ok': 'pool will grow'}
 
 
 @Panel.register
@@ -294,9 +293,8 @@ def pool_shrink(state, n=1, **kwargs):
         state.consumer.controller.autoscaler.force_scale_down(n)
     else:
         state.consumer.pool.shrink(n)
-    state.consumer.qos.decrement_eventually(n)
-    state.consumer.initial_prefetch_count -= n
-    return {'ok': 'terminated worker processes'}
+    state.consumer.decrement_prefetch_count(n)
+    return {'ok': 'pool will shrink'}
 
 
 @Panel.register