123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import multiprocessing
- import threading
- import time
- from multiprocessing import TimeoutError
- PING_TIMEOUT = 30 # seconds
- JOIN_TIMEOUT = 2
- CHECK_INTERVAL = 2
- MAX_RESTART_FREQ = 3
- MAX_RESTART_FREQ_TIME = 10
- class MaxRestartsExceededError(Exception):
- """Restarts exceeded the maximum restart frequency."""
- def raise_ping_timeout(msg):
- """Raises :exc:`multiprocessing.TimeoutError`, for use in
- :class:`threading.Timer` callbacks."""
- raise TimeoutError("Supervised: Timed out while pinging process.")
- class OFASupervisor(object):
- """Process supervisor using the `one_for_all`_ strategy.
- .. _`one_for_all`:
- http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
- However, instead of registering a list of processes, you have one
- process which runs a pool. Makes for an easy implementation.
- :param target: see :attr:`target`.
- :param args: see :attr:`args`.
- :param kwargs: see :attr:`kwargs`.
- :param join_timeout: see :attr:`join_timeout`.
- :param max_restart_freq: see :attr:`max_restart_freq`.
- :param max_restart_freq_time: see :attr:`max_restart_freq_time`.
- :param check_interval: see :attr:`max_restart_freq_time`.
- .. attribute:: target
- The target callable to be launched in a new process.
- .. attribute:: args
- The positional arguments to apply to :attr:`target`.
- .. attribute:: kwargs
- The keyword arguments to apply to :attr:`target`.
- .. attribute:: join_timeout
- If the process is dead, try to give it a few seconds to join.
- .. attribute:: max_restart_freq
- Limit the number of restarts which can occur in a given time interval.
- The max restart frequency is the number of restarts that can occur
- within the interval :attr:`max_restart_freq_time`.
- The restart mechanism prevents situations where the process repeatedly
- dies for the same reason. If this happens both the process and the
- supervisor is terminated.
- .. attribute:: max_restart_freq_time
- See :attr:`max_restart_freq`.
- .. attribute:: check_interval
- The time in seconds, between process pings.
- """
- Process = multiprocessing.Process
- def __init__(self, target, args=None, kwargs=None,
- ping_timeout=PING_TIMEOUT, join_timeout=JOIN_TIMEOUT,
- max_restart_freq = MAX_RESTART_FREQ,
- max_restart_freq_time=MAX_RESTART_FREQ_TIME,
- check_interval=CHECK_INTERVAL):
- self.target = target
- self.args = args or []
- self.kwargs = kwargs or {}
- self.ping_timeout = ping_timeout
- self.join_timeout = join_timeout
- self.check_interval = check_interval
- self.max_restart_freq = max_restart_freq
- self.max_restart_freq_time = max_restart_freq_time
- self.restarts_in_frame = 0
- def start(self):
- """Launches the :attr:`target` in a seperate process and starts
- supervising it."""
- target = self.target
- def _start_supervised_process():
- """Start the :attr:`target` in a new process."""
- process = self.Process(target=target,
- args=self.args, kwargs=self.kwargs)
- process.start()
- return process
- def _restart(process):
- """Terminate the process and restart."""
- process.join(timeout=self.join_timeout)
- process.terminate()
- self.restarts_in_frame += 1
- process = _start_supervised_process()
- process = _start_supervised_process()
- try:
- restart_frame = 0
- while True:
- if restart_frame > self.max_restart_freq_time:
- if self.restarts_in_frame >= self.max_restart_freq:
- raise MaxRestartsExceededError(
- "Supervised: Max restart frequency reached")
- restart_frame = 0
- self.restarts_in_frame = 0
- try:
- proc_is_alive = self._is_alive(process)
- except TimeoutError:
- proc_is_alive = False
- if not proc_is_alive:
- _restart(process)
- time.sleep(self.check_interval)
- restart_frame += self.check_interval
- finally:
- process.join()
- def _is_alive(self, process):
- """Sends a ping to the target process to see if it's alive.
- :rtype bool:
- """
- timeout_timer = threading.Timer(self.ping_timeout, raise_ping_timeout)
- try:
- alive = process.is_alive()
- finally:
- timeout_timer.cancel()
- return alive
|