Browse Source

Refactor pool supervision. If anyone ever makes a list of Monthy Python jokes
used in the Python programming language, celery should now be listed under
"bring_out_the_dead" :)

Ask Solem 15 năm trước cách đây
mục cha
commit
26c10d00a3
1 tập tin đã thay đổi với 47 bổ sung34 xóa
  1. 47 34
      celery/pool.py

+ 47 - 34
celery/pool.py

@@ -38,17 +38,48 @@ def reap_process(pid):
         raise
         raise
     return is_dead
     return is_dead
 
 
+    
+def process_is_dead(process):
+    # Make sure PID is an integer (no idea why this happens).
+    try:
+        int(process.pid)
+    except (TypeError, ValueError):
+        return True
+
+    # Try to see if the process is actually running,
+    # and reap zombie proceses while we're at it.
+
+    if reap_process(process.pid):
+        return True
+    
+    # Then try to ping the process using its pipe.
+    try:
+        proc_is_alive = process.is_alive()
+    except OSError:
+        return True
+    else:
+        return not proc_is_alive
+
 
 
 class DynamicPool(Pool):
 class DynamicPool(Pool):
     """Version of :class:`multiprocessing.Pool` that can dynamically grow
     """Version of :class:`multiprocessing.Pool` that can dynamically grow
     in size."""
     in size."""
 
 
     def __init__(self, processes=None, initializer=None, initargs=()):
     def __init__(self, processes=None, initializer=None, initargs=()):
+
+        if processes is None:
+            try:
+                processes = cpu_count()
+            except NotImplementedError:
+                processes = 1
+
         super(DynamicPool, self).__init__(processes=processes,
         super(DynamicPool, self).__init__(processes=processes,
                                           initializer=initializer,
                                           initializer=initializer,
                                           initargs=initargs)
                                           initargs=initargs)
         self._initializer = initializer
         self._initializer = initializer
         self._initargs = initargs
         self._initargs = initargs
+        self._size = processes
+        self.logger = multiprocessing.get_logger()
 
 
     def add_worker(self):
     def add_worker(self):
         """Add another worker to the pool."""
         """Add another worker to the pool."""
@@ -65,44 +96,26 @@ class DynamicPool(Pool):
         [self.add_worker() for i in range(size)]
         [self.add_worker() for i in range(size)]
 
 
     def is_dead(self, process):
     def is_dead(self, process):
-        # Make sure PID is an integer (no idea why this happens).
-        try:
-            int(process.pid)
-        except (TypeError, ValueError):
+        if process_is_dead(process):
+            self.logger.info("DynamicPool: Found dead process (PID: %s)" % (
+                process.pid))
             return True
             return True
+        return False
 
 
-        # Try to see if the process is actually running,
-        # and reap zombie proceses while we're at it.
-
-        if reap_process(process.pid):
-            return True
-    
-        # Then try to ping the process using its pipe.
-        try:
-            proc_is_alive = process.is_alive()
-        except OSError:
-            return True
-        else:
-            return not proc_is_alive
-
-    def replace_dead_workers(self):
-        logger = multiprocessing.get_logger()
-
-        new_pool = []
-        dead_count = 0
+    def bring_out_the_dead(self):
+        dead = []
+        alive = []
         for process in self._pool:
         for process in self._pool:
-            if self.is_dead(process):
-                logger.info("DynamicPool: Found dead process (PID: %s)" % (
-                    process.pid))
-                dead_count += 1
-            else:
-                new_pool.append(process)
+            if process and process.pid:
+                if self.is_dead(process):
+                    dead += [process]
+                else:
+                    alive += [process]
+        return dead, alive 
 
 
-        if dead_count:
-            self.grow(dead_count)
-            self._pool = new_pool
-
-        return dead_count
+    def replace_dead_workers(self):
+        dead, self._pool = self.bring_out_the_dead()
+        self.grow(self._size if len(dead) > self._size else len(dead))
 
 
 
 
 class TaskPool(object):
 class TaskPool(object):