|
@@ -3,7 +3,9 @@
|
|
|
Process Pools.
|
|
|
|
|
|
"""
|
|
|
+import os
|
|
|
import time
|
|
|
+import errno
|
|
|
import multiprocessing
|
|
|
|
|
|
from multiprocessing.pool import Pool, worker
|
|
@@ -12,6 +14,31 @@ from celery.utils import gen_unique_id
|
|
|
from functools import partial as curry
|
|
|
|
|
|
|
|
|
+def pid_is_dead(pid):
|
|
|
+ try:
|
|
|
+ return kill(pid, 0)
|
|
|
+ except OSError, err:
|
|
|
+ if err.errno == errno.ESRCH:
|
|
|
+ return True # No such process.
|
|
|
+ elif err.errno == errno.EPERM:
|
|
|
+ return False # Operation not permitted.
|
|
|
+ else:
|
|
|
+ raise
|
|
|
+
|
|
|
+
|
|
|
+def reap_process(pid):
|
|
|
+ if pid_is_dead(pid):
|
|
|
+ return True
|
|
|
+
|
|
|
+ try:
|
|
|
+ is_dead, _ = os.waitpid(pid, os.WNOHANG)
|
|
|
+ except OSError, err:
|
|
|
+ if err.errno == errno.ECHILD:
|
|
|
+ return False # No child processes.
|
|
|
+ raise
|
|
|
+ return is_dead
|
|
|
+
|
|
|
+
|
|
|
class DynamicPool(Pool):
|
|
|
"""Version of :class:`multiprocessing.Pool` that can dynamically grow
|
|
|
in size."""
|
|
@@ -37,24 +64,25 @@ class DynamicPool(Pool):
|
|
|
"""Add ``size`` new workers to the pool."""
|
|
|
map(self._add_worker, range(size))
|
|
|
|
|
|
- def is_alive(self, process):
|
|
|
- try:
|
|
|
- proc_is_alive = process.is_alive()
|
|
|
- except OSError:
|
|
|
- return False
|
|
|
- else:
|
|
|
- return proc_is_alive
|
|
|
+ def is_dead(self, process):
|
|
|
+ # First try to see if the process is actually running,
|
|
|
+ # and reap zombie proceses while we're at it.
|
|
|
+ if reap_process(process.pid):
|
|
|
+ return True
|
|
|
+
|
|
|
+ # Then try to ping the process using its pipe.
|
|
|
+ return not process.is_alive()
|
|
|
|
|
|
def replace_dead_workers(self):
|
|
|
- dead = [process for process in self._pool
|
|
|
- if not self.is_alive(process)]
|
|
|
- if dead:
|
|
|
- dead_pids = [process.pid for process in dead]
|
|
|
+ dead_processes = filter(self.is_dead, self._pool)
|
|
|
+
|
|
|
+ if dead_processes:
|
|
|
+ dead_pids = [process.pid for process in dead_processes]
|
|
|
self._pool = [process for process in self._pool
|
|
|
- if process.pid not in dead_pids]
|
|
|
- self.grow(len(dead))
|
|
|
+ if process not in dead_pids]
|
|
|
+ self.grow(len(dead_processes))
|
|
|
|
|
|
- return dead
|
|
|
+ return dead_processes
|
|
|
|
|
|
|
|
|
class TaskPool(object):
|