Explorar o código

Merge pull request #2698 from PhilipGarnero/improve_autoscale_down

Improve the autoscale behavior
Omer Katz %!s(int64=9) %!d(string=hai) anos
pai
achega
893aa2ca9e
Modificáronse 2 ficheiros con 13 adicións e 14 borrados
  1. 3 3
      celery/tests/worker/test_autoscale.py
  2. 10 11
      celery/worker/autoscale.py

+ 3 - 3
celery/tests/worker/test_autoscale.py

@@ -107,7 +107,7 @@ class test_Autoscaler(AppCase):
         state.reserved_requests.clear()
         x.body()
         self.assertEqual(x.pool.num_processes, 10)
-        x._last_action = monotonic() - 10000
+        x._last_scale_up = monotonic() - 10000
         x.body()
         self.assertEqual(x.pool.num_processes, 3)
         self.assertTrue(worker.consumer._update_prefetch_count.called)
@@ -141,7 +141,7 @@ class test_Autoscaler(AppCase):
         worker = Mock(name='worker')
         x = autoscale.Autoscaler(self.pool, 10, 3, worker=worker)
         x.scale_up(3)
-        x._last_action = monotonic() - 10000
+        x._last_scale_up = monotonic() - 10000
         x.pool.shrink_raises_ValueError = True
         x.scale_down(1)
         self.assertTrue(debug.call_count)
@@ -156,7 +156,7 @@ class test_Autoscaler(AppCase):
         self.assertEqual(x.processes, 5)
         x.force_scale_down(3)
         self.assertEqual(x.processes, 2)
-        x.update(3, None)
+        x.update(None, 3)
         self.assertEqual(x.processes, 3)
         x.force_scale_down(1000)
         self.assertEqual(x.min_concurrency, 0)

+ 10 - 11
celery/worker/autoscale.py

@@ -71,7 +71,7 @@ class Autoscaler(bgThread):
         self.max_concurrency = max_concurrency
         self.min_concurrency = min_concurrency
         self.keepalive = keepalive
-        self._last_action = None
+        self._last_scale_up = None
         self.worker = worker
 
         assert self.keepalive, 'cannot scale down too fast.'
@@ -87,8 +87,9 @@ class Autoscaler(bgThread):
         if cur > procs:
             self.scale_up(cur - procs)
             return True
-        elif cur < procs:
-            self.scale_down((procs - cur) - self.min_concurrency)
+        cur = max(self.qty, self.min_concurrency)
+        if cur < procs:
+            self.scale_down(procs - cur)
             return True
 
     def maybe_scale(self, req=None):
@@ -98,12 +99,12 @@ class Autoscaler(bgThread):
     def update(self, max=None, min=None):
         with self.mutex:
             if max is not None:
-                if max < self.max_concurrency:
+                if max < self.processes:
                     self._shrink(self.processes - max)
                 self.max_concurrency = max
             if min is not None:
-                if min > self.min_concurrency:
-                    self._grow(min - self.min_concurrency)
+                if min > self.processes:
+                    self._grow(min - self.processes)
                 self.min_concurrency = min
             return self.max_concurrency, self.min_concurrency
 
@@ -112,7 +113,6 @@ class Autoscaler(bgThread):
             new = self.processes + n
             if new > self.max_concurrency:
                 self.max_concurrency = new
-            self.min_concurrency += 1
             self._grow(n)
 
     def force_scale_down(self, n):
@@ -123,13 +123,12 @@ class Autoscaler(bgThread):
             self._shrink(min(n, self.processes))
 
     def scale_up(self, n):
-        self._last_action = monotonic()
+        self._last_scale_up = monotonic()
         return self._grow(n)
 
     def scale_down(self, n):
-        if n and self._last_action and (
-                monotonic() - self._last_action > self.keepalive):
-            self._last_action = monotonic()
+        if self._last_scale_up and (
+                monotonic() - self._last_scale_up > self.keepalive):
             return self._shrink(n)
 
     def _grow(self, n):