Explorar o código

Can now change autoscale settings at runtime

Ask Solem %!s(int64=13) %!d(string=hai) anos
pai
achega
1cd29c3e53

+ 3 - 1
celery/worker/__init__.py

@@ -190,6 +190,7 @@ class WorkController(object):
             # needs a custom implementation.
             self.eta_scheduler_cls = self.pool.Timer
 
+        self.autoscaler = None
         if autoscale:
             self.autoscaler = instantiate(self.autoscaler_cls, self.pool,
                                           max_concurrency=max_concurrency,
@@ -226,7 +227,8 @@ class WorkController(object):
                                     initial_prefetch_count=prefetch_count,
                                     pool=self.pool,
                                     priority_timer=self.priority_timer,
-                                    app=self.app)
+                                    app=self.app,
+                                    controller=self)
 
         # The order is important here;
         #   the first in the list is the first to start,

+ 59 - 18
celery/worker/autoscale.py

@@ -14,6 +14,7 @@ class Autoscaler(threading.Thread):
             keepalive=30, logger=None):
         threading.Thread.__init__(self)
         self.pool = pool
+        self.mutex = threading.Lock()
         self.max_concurrency = max_concurrency
         self.min_concurrency = min_concurrency
         self.keepalive = keepalive
@@ -27,38 +28,72 @@ class Autoscaler(threading.Thread):
         assert self.keepalive, "can't scale down too fast."
 
     def scale(self):
-        current = min(self.qty, self.max_concurrency)
-        if current > self.processes:
-            self.scale_up(current - self.processes)
-        elif current < self.processes:
-            self.scale_down((self.processes - current) - self.min_concurrency)
-        sleep(1.0)
+        with self.mutex:
+            current = min(self.qty, self.max_concurrency)
+            if current > self.processes:
+                self.scale_up(current - self.processes)
+            elif current < self.processes:
+                self.scale_down((self.processes - current) - self.min_concurrency)
+
+    def update(self, max=None, min=None):
+        with self.mutex:
+            if max is not None:
+                if max < self.max_concurrency:
+                    self._shrink(self.processes - max)
+                self.max_concurrency = max
+            if min is not None:
+                if min > self.min_concurrency:
+                    self._grow(min - self.min_concurrency)
+                self.min_concurrency = min
+            return self.max_concurrency, self.min_concurrency
+
+    def force_scale_up(self, n):
+        with self.mutex:
+            new = self.processes + n
+            if new > self.max_concurrency:
+                self.max_concurrency = new
+            self.min_concurrency += 1
+            self._grow(n)
+
+    def force_scale_down(self, n):
+        with self.mutex:
+            new = self.processes - n
+            if new < self.min_concurrency:
+                self.min_concurrency = new
+            self._shrink(n)
 
     def scale_up(self, n):
-        self.logger.info("Scaling up %s processes." % (n, ))
         self._last_action = time()
-        return self.pool.grow(n)
+        return self._grow(n)
+
+    def _grow(self, n):
+        self.logger.info("Scaling up %s processes." % (n, ))
+        self.pool.grow(n)
+
+    def _shrink(self, n):
+        self.logger.info("Scaling down %s processes." % (n, ))
+        try:
+            self.pool.shrink(n)
+        except ValueError:
+            self.logger.debug(
+                "Autoscaler won't scale down: all processes busy.")
+        except Exception, exc:
+            self.logger.error("Autoscaler: scale_down: %r\n%r" % (
+                                exc, traceback.format_stack()),
+                                exc_info=sys.exc_info())
 
     def scale_down(self, n):
         if not self._last_action or not n:
             return
         if time() - self._last_action > self.keepalive:
-            self.logger.info("Scaling down %s processes." % (n, ))
             self._last_action = time()
-            try:
-                self.pool.shrink(n)
-            except ValueError:
-                self.logger.debug(
-                    "Autoscaler won't scale down: all processes busy.")
-            except Exception, exc:
-                self.logger.error("Autoscaler: scale_down: %r\n%r" % (
-                                    exc, traceback.format_stack()),
-                                  exc_info=sys.exc_info())
+            self._shrink(n)
 
     def run(self):
         while not self._shutdown.isSet():
             try:
                 self.scale()
+                sleep(1.0)
             except Exception, exc:
                 self.logger.error("Thread Autoscaler crashed: %r" % (exc, ),
                                   exc_info=sys.exc_info())
@@ -71,6 +106,12 @@ class Autoscaler(threading.Thread):
         if self.isAlive():
             self.join(1e10)
 
+    def info(self):
+        return {"max": self.max_concurrency,
+                "min": self.min_concurrency,
+                "current": self.processes,
+                "qty": self.qty}
+
     @property
     def qty(self):
         return len(state.reserved_requests)

+ 2 - 1
celery/worker/consumer.py

@@ -252,10 +252,11 @@ class Consumer(object):
     def __init__(self, ready_queue, eta_schedule, logger,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
-            priority_timer=None):
+            priority_timer=None, controller=None):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
+        self.controller = controller
         self.broadcast_consumer = None
         self.ready_queue = ready_queue
         self.eta_schedule = eta_schedule

+ 22 - 3
celery/worker/control/builtins.py

@@ -160,9 +160,13 @@ def dump_active(panel, safe=False, **kwargs):
 
 @Panel.register
 def stats(panel, **kwargs):
+    asinfo = {}
+    if panel.consumer.controller.autoscaler:
+        asinfo = panel.consumer.controller.autoscaler.info()
     return {"total": state.total_count,
             "consumer": panel.consumer.info,
-            "pool": panel.consumer.pool.info}
+            "pool": panel.consumer.pool.info,
+            "autoscaler": asinfo}
 
 
 @Panel.register
@@ -197,16 +201,31 @@ def ping(panel, **kwargs):
 
 @Panel.register
 def pool_grow(panel, n=1, **kwargs):
-    panel.consumer.pool.grow(n)
+    if panel.consumer.controller.autoscaler:
+        panel.consumer.controller.autoscaler.force_scale_up(n)
+    else:
+        panel.consumer.pool.grow(n)
     return {"ok": "spawned worker processes"}
 
 
 @Panel.register
 def pool_shrink(panel, n=1, **kwargs):
-    panel.consumer.pool.shrink(n)
+    if panel.consumer.controller.autoscaler:
+        panel.consumer.controller.autoscaler.force_scale_down(n)
+    else:
+        panel.consumer.pool.shrink(n)
     return {"ok": "terminated worker processes"}
 
 
+@Panel.register
+def autoscale(panel, max=None, min=None):
+    autoscaler = panel.consumer.controller.autoscaler
+    if autoscaler:
+        max_, min_ = autoscaler.update(max, min)
+        return {"ok": "autoscale now min=%r max=%r" % (max_, min_)}
+    raise ValueError("Autoscale not enabled")
+
+
 @Panel.register
 def shutdown(panel, **kwargs):
     panel.logger.warning("Got shutdown from remote.")