Browse Source

Added the ability to grow and shrink the pool

Ask Solem 14 years ago
parent
commit
1bb6beec5a

+ 6 - 0
celery/concurrency/processes/__init__.py

@@ -128,6 +128,12 @@ class TaskPool(object):
                                       error_callback=on_worker_error,
                                       waitforslot=self.putlocks)
 
+    def grow(self, n=1):
+        return self._pool.grow(n)
+
+    def shrink(self, n=1):
+        return self._pool.shrink(n)
+
     def on_worker_error(self, errbacks, exc):
         einfo = ExceptionInfo((exc.__class__, exc, None))
         [errback(einfo) for errback in errbacks]

+ 33 - 1
celery/concurrency/processes/pool.py

@@ -436,7 +436,7 @@ class Pool(object):
         self._worker_handler = self.Supervisor(self)
         self._worker_handler.start()
 
-        self._putlock = threading.BoundedSemaphore(self._processes)
+        self._putlock = threading.BoundedSemaphore(self._processes, True)
 
         self._task_handler = self.TaskHandler(self._taskqueue,
                                               self._quick_put,
@@ -512,6 +512,38 @@ class Pool(object):
             return True
         return False
 
+    def shrink(self, n=1):
+        for i, worker in enumerate(self._iterinactive()):
+            self._processes -= 1
+            if self._putlock:
+                self._putlock._initial_value -= 1
+                self._putlock._Semaphore__value -= 1
+            worker.terminate()
+            if i == n - 1:
+                return
+        raise ValueError("Can't shrink pool. All processes busy!")
+
+    def grow(self, n=1):
+        for i in xrange(n):
+            self._processes += 1
+            if self._putlock:
+                self._putlock._initial_value += 1
+                self._putlock._Semaphore__value += 1
+            self._create_worker_process()
+
+    def _iterinactive(self):
+        for worker in self._pool:
+            if not self._worker_active(worker):
+                yield worker
+        raise
+
+    def _worker_active(self, worker):
+        jobs = []
+        for job in self._cache.values():
+            if worker.pid in job.worker_pids():
+                return True
+        return False
+
     def _repopulate_pool(self):
         """Bring the number of pool processes up to the specified number,
         for use after reaping workers which have exited.

+ 11 - 0
celery/worker/control/builtins.py

@@ -187,6 +187,17 @@ def ping(panel, **kwargs):
     return "pong"
 
 
+@Panel.register
+def pool_grow(panel, n=1, **kwargs):
+    panel.listener.pool.grow(n)
+    return {"ok": "spawned worker processes"}
+
+@Panel.register
+def pool_shrink(panel, n=1, **kwargs):
+    panel.listener.pool.shrink(n)
+    return {"ok": "terminated worker processes"}
+
+
 @Panel.register
 def shutdown(panel, **kwargs):
     panel.logger.critical("Got shutdown from remote.")