123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- """
- Process Pools.
- """
- import os
- import time
- import errno
- import multiprocessing
- from multiprocessing.pool import Pool, worker
- from celery.datastructures import ExceptionInfo
- from celery.utils import gen_unique_id
- from functools import partial as curry
- def pid_is_dead(pid):
- """Check if a process is not running by PID.
-
- :rtype bool:
- """
- try:
- return os.kill(pid, 0)
- except OSError, err:
- if err.errno == errno.ESRCH:
- return True # No such process.
- elif err.errno == errno.EPERM:
- return False # Operation not permitted.
- else:
- raise
- def reap_process(pid):
- """Reap process if the process is a zombie.
-
- :returns: ``True`` if process was reaped or is not running,
- ``False`` otherwise.
- """
- if pid_is_dead(pid):
- return True
- try:
- is_dead, _ = os.waitpid(pid, os.WNOHANG)
- except OSError, err:
- if err.errno == errno.ECHILD:
- return False # No child processes.
- raise
- return is_dead
-
- def process_is_dead(process):
- """Check if process is not running anymore.
- First it finds out if the process is running by sending
- signal 0. Then if the process is a child process, and is running
- it finds out if it's a zombie process and reaps it.
- If the process is running and is not a zombie it tries to send
- a ping through the process pipe.
- :param process: A :class:`multiprocessing.Process` instance.
- :returns: ``True`` if the process is not running, ``False`` otherwise.
- """
- # Make sure PID is an integer (no idea why this happens).
- try:
- int(process.pid)
- except (TypeError, ValueError):
- return True
- # Try to see if the process is actually running,
- # and reap zombie proceses while we're at it.
- if reap_process(process.pid):
- return True
-
- # Then try to ping the process using its pipe.
- try:
- proc_is_alive = process.is_alive()
- except OSError:
- return True
- else:
- return not proc_is_alive
- class DynamicPool(Pool):
- """Version of :class:`multiprocessing.Pool` that can dynamically grow
- in size."""
- def __init__(self, processes=None, initializer=None, initargs=()):
- if processes is None:
- try:
- processes = cpu_count()
- except NotImplementedError:
- processes = 1
- super(DynamicPool, self).__init__(processes=processes,
- initializer=initializer,
- initargs=initargs)
- self._initializer = initializer
- self._initargs = initargs
- self._size = processes
- self.logger = multiprocessing.get_logger()
- def add_worker(self):
- """Add another worker to the pool."""
- w = self.Process(target=worker,
- args=(self._inqueue, self._outqueue,
- self._initializer, self._initargs))
- self._pool.append(w)
- w.name = w.name.replace("Process", "PoolWorker")
- w.daemon = True
- w.start()
- def grow(self, size=1):
- """Add workers to the pool.
-
- :keyword size: Number of workers to add (default: 1)
-
- """
- [self.add_worker() for i in range(size)]
- def _is_dead(self, process):
- """Try to find out if the process is dead.
- :rtype bool:
- """
- if process_is_dead(process):
- self.logger.info("DynamicPool: Found dead process (PID: %s)" % (
- process.pid))
- return True
- return False
- def _bring_out_the_dead(self):
- """Sort out dead process from pool.
- :returns: Tuple of two lists, the first list with dead processes,
- the second with active processes.
- """
- dead, alive = [], []
- for process in self._pool:
- if process and process.pid:
- dest = dead if self._is_dead(process) else alive
- dest.append(process)
- return dead, alive
- def replace_dead_workers(self):
- """Replace dead workers in the pool by spawning new ones."""
- dead, self._pool = self._bring_out_the_dead()
- self.grow(self._size if len(dead) > self._size else len(dead))
- class TaskPool(object):
- """Process Pool for processing tasks in parallel.
- :param limit: see :attr:`limit` attribute.
- :param logger: see :attr:`logger` attribute.
- .. attribute:: limit
- The number of processes that can run simultaneously.
- .. attribute:: logger
- The logger used for debugging.
- """
- def __init__(self, limit, logger=None):
- self.limit = limit
- self.logger = logger or multiprocessing.get_logger()
- self._pool = None
- def start(self):
- """Run the task pool.
- Will pre-fork all workers so they're ready to accept tasks.
- """
- self._pool = DynamicPool(processes=self.limit)
- def stop(self):
- """Terminate the pool."""
- self._pool.terminate()
- self._pool = None
- def replace_dead_workers(self):
- self.logger.debug("TaskPool: Finding dead pool processes...")
- dead_count = self._pool.replace_dead_workers()
- if dead_count:
- self.logger.info(
- "TaskPool: Replaced %d dead pool workers..." % (
- dead_count))
- def apply_async(self, target, args=None, kwargs=None, callbacks=None,
- errbacks=None, on_ack=None, meta=None):
- """Equivalent of the :func:``apply`` built-in function.
- All ``callbacks`` and ``errbacks`` should complete immediately since
- otherwise the thread which handles the result will get blocked.
- """
- args = args or []
- kwargs = kwargs or {}
- callbacks = callbacks or []
- errbacks = errbacks or []
- meta = meta or {}
- on_return = curry(self.on_return, callbacks, errbacks,
- on_ack, meta)
- self.logger.debug("TaskPool: Apply %s (args:%s kwargs:%s)" % (
- target, args, kwargs))
- self.replace_dead_workers()
- return self._pool.apply_async(target, args, kwargs,
- callback=on_return)
- def on_return(self, callbacks, errbacks, on_ack, meta, ret_value):
- """What to do when the process returns."""
- # Acknowledge the task as being processed.
- if on_ack:
- on_ack()
- self.on_ready(callbacks, errbacks, meta, ret_value)
- def on_ready(self, callbacks, errbacks, meta, ret_value):
- """What to do when a worker task is ready and its return value has
- been collected."""
- if isinstance(ret_value, ExceptionInfo):
- if isinstance(ret_value.exception, (
- SystemExit, KeyboardInterrupt)):
- raise ret_value.exception
- for errback in errbacks:
- errback(ret_value, meta)
- else:
- for callback in callbacks:
- callback(ret_value, meta)
|