Explorar o código

Make sure every pool process has a PID

Ask Solem %!s(int64=16) %!d(string=hai) anos
pai
achega
c23f837e8a
Modificáronse 1 ficheiros con 28 adicións e 13 borrados
  1. 28 13
      celery/pool.py

+ 28 - 13
celery/pool.py

@@ -65,8 +65,15 @@ class DynamicPool(Pool):
         [self.add_worker() for i in range(size)]
 
     def is_dead(self, process):
-        # First try to see if the process is actually running,
+        # Make sure PID is an integer (no idea why this happens).
+        try:
+            int(process.pid)
+        except (TypeError, ValueError):
+            return True
+
+        # Try to see if the process is actually running,
         # and reap zombie proceses while we're at it.
+
         if reap_process(process.pid):
             return True
     
@@ -79,15 +86,23 @@ class DynamicPool(Pool):
             return not proc_is_alive
 
     def replace_dead_workers(self):
-        dead_processes = filter(self.is_dead, self._pool)
+        logger = multiprocessing.get_logger()
+
+        new_pool = []
+        dead_count = 0
+        for process in self._pool:
+            if self.is_dead(process):
+                logger.info("DynamicPool: Found dead process %s (PID: %s)" % (
+                    (process, process.pid))
+                dead_count += 1
+            else:
+                new_pool.append(process)
+
+        if dead_count:
+            self.grow(dead_count)
+            self._pool = new_pool
 
-        if dead_processes:
-            dead_pids = [process.pid for process in dead_processes]
-            self._pool = [process for process in self._pool
-                            if process not in dead_pids]
-            self.grow(len(dead_processes))
-        
-        return dead_processes
+        return dead_count
 
 
 class TaskPool(object):
@@ -127,11 +142,11 @@ class TaskPool(object):
 
     def replace_dead_workers(self):
         self.logger.debug("TaskPool: Finding dead pool processes...")
-        dead = self._pool.replace_dead_workers()
-        if dead:
+        dead_count = self._pool.replace_dead_workers()
+        if dead_count:
             self.logger.info(
-                "TaskPool: Replaced %s dead pool workers..." % (
-                    len(dead)))
+                "TaskPool: Replaced %d dead pool workers..." % (
+                    dead_count))
 
     def apply_async(self, target, args=None, kwargs=None, callbacks=None,
             errbacks=None, on_ack=None, meta=None):