|
@@ -12,6 +12,7 @@ from multiprocessing.pool import Pool, worker
|
|
from celery.datastructures import ExceptionInfo
|
|
from celery.datastructures import ExceptionInfo
|
|
from celery.utils import gen_unique_id
|
|
from celery.utils import gen_unique_id
|
|
from functools import partial as curry
|
|
from functools import partial as curry
|
|
|
|
+from operator import isNumberType
|
|
|
|
|
|
|
|
|
|
def pid_is_dead(pid):
|
|
def pid_is_dead(pid):
|
|
@@ -65,12 +66,6 @@ def process_is_dead(process):
|
|
|
|
|
|
"""
|
|
"""
|
|
|
|
|
|
- # Make sure PID is an integer (no idea why this happens).
|
|
|
|
- try:
|
|
|
|
- int(process.pid)
|
|
|
|
- except (TypeError, ValueError):
|
|
|
|
- return True
|
|
|
|
-
|
|
|
|
# Try to see if the process is actually running,
|
|
# Try to see if the process is actually running,
|
|
# and reap zombie proceses while we're at it.
|
|
# and reap zombie proceses while we're at it.
|
|
|
|
|
|
@@ -111,10 +106,13 @@ class DynamicPool(Pool):
|
|
w = self.Process(target=worker,
|
|
w = self.Process(target=worker,
|
|
args=(self._inqueue, self._outqueue,
|
|
args=(self._inqueue, self._outqueue,
|
|
self._initializer, self._initargs))
|
|
self._initializer, self._initargs))
|
|
- self._pool.append(w)
|
|
|
|
w.name = w.name.replace("Process", "PoolWorker")
|
|
w.name = w.name.replace("Process", "PoolWorker")
|
|
w.daemon = True
|
|
w.daemon = True
|
|
w.start()
|
|
w.start()
|
|
|
|
+ self._pool.append(w)
|
|
|
|
+ self.logger.debug(
|
|
|
|
+ "DynamicPool: Started pool worker %s (PID: %s, Poolsize: %d)" %(
|
|
|
|
+ w.name, w.pid, len(self._pool))
|
|
|
|
|
|
def grow(self, size=1):
|
|
def grow(self, size=1):
|
|
"""Add workers to the pool.
|
|
"""Add workers to the pool.
|
|
@@ -145,15 +143,24 @@ class DynamicPool(Pool):
|
|
"""
|
|
"""
|
|
dead, alive = [], []
|
|
dead, alive = [], []
|
|
for process in self._pool:
|
|
for process in self._pool:
|
|
- if process and process.pid:
|
|
|
|
|
|
+ if process and process.pid and isNumberType(process.pid):
|
|
dest = dead if self._is_dead(process) else alive
|
|
dest = dead if self._is_dead(process) else alive
|
|
dest.append(process)
|
|
dest.append(process)
|
|
return dead, alive
|
|
return dead, alive
|
|
|
|
|
|
def replace_dead_workers(self):
|
|
def replace_dead_workers(self):
|
|
- """Replace dead workers in the pool by spawning new ones."""
|
|
|
|
- dead, self._pool = self._bring_out_the_dead()
|
|
|
|
- self.grow(self._size if len(dead) > self._size else len(dead))
|
|
|
|
|
|
+ """Replace dead workers in the pool by spawning new ones.
|
|
|
|
+
|
|
|
|
+ :returns: number of dead processes replaced, or ``None`` if all
|
|
|
|
+ processes are alive and running.
|
|
|
|
+
|
|
|
|
+ """
|
|
|
|
+ dead, alive = self._bring_out_the_dead()
|
|
|
|
+ if dead:
|
|
|
|
+ dead_count = len(dead)
|
|
|
|
+ self._pool = alive
|
|
|
|
+ self.grow(self._size if dead_count > self._size else dead_count)
|
|
|
|
+ return dead_count
|
|
|
|
|
|
|
|
|
|
class TaskPool(object):
|
|
class TaskPool(object):
|