supervisor.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from multiprocessing import Process, TimeoutError
  2. import threading
  3. import time
  4. PING_TIMEOUT = 30 # seconds
  5. JOIN_TIMEOUT = 2
  6. CHECK_INTERVAL = 2
  7. MAX_RESTART_FREQ = 3
  8. MAX_RESTART_FREQ_TIME = 10
  9. def raise_ping_timeout(msg):
  10. """Raises :exc:`multiprocessing.TimeoutError`, for use in
  11. :class:`threading.Timer` callbacks."""
  12. raise TimeoutError("Supervised: Timed out while pinging process.")
  13. class OFASupervisor(object):
  14. """Process supervisor using the `one_for_all`_ strategy.
  15. .. _`one_for_all`:
  16. http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
  17. However, instead of registering a list of processes, you have one
  18. process which runs a pool. Makes for an easy implementation.
  19. :param target: see :attr:`target`.
  20. :param args: see :attr:`args`.
  21. :param kwargs: see :attr:`kwargs`.
  22. :param join_timeout: see :attr:`join_timeout`.
  23. :param max_restart_freq: see :attr:`max_restart_freq`.
  24. :param max_restart_freq_time: see :attr:`max_restart_freq_time`.
  25. :param check_interval: see :attr:`max_restart_freq_time`.
  26. .. attribute:: target
  27. The target callable to be launched in a new process.
  28. .. attribute:: args
  29. The positional arguments to apply to :attr:`target`.
  30. .. attribute:: kwargs
  31. The keyword arguments to apply to :attr:`target`.
  32. .. attribute:: join_timeout
  33. If the process is dead, try to give it a few seconds to join.
  34. .. attribute:: max_restart_freq
  35. Limit the number of restarts which can occur in a given time interval.
  36. The max restart frequency is the number of restarts that can occur
  37. within the interval :attr:`max_restart_freq_time`.
  38. The restart mechanism prevents situations where the process repeatedly
  39. dies for the same reason. If this happens both the process and the
  40. supervisor is terminated.
  41. .. attribute:: max_restart_freq_time
  42. See :attr:`max_restart_freq`.
  43. .. attribute:: check_interval
  44. The time in seconds, between process pings.
  45. """
  46. def __init__(self, target, args=None, kwargs=None,
  47. ping_timeout=PING_TIMEOUT, join_timeout=JOIN_TIMEOUT,
  48. max_restart_freq = MAX_RESTART_FREQ,
  49. max_restart_freq_time=MAX_RESTART_FREQ_TIME,
  50. check_interval=CHECK_INTERVAL):
  51. self.target = target
  52. self.args = args or []
  53. self.kwargs = kwargs or {}
  54. self.ping_timeout = ping_timeout
  55. self.join_timeout = join_timeout
  56. self.check_interval = check_interval
  57. self.max_restart_freq = max_restart_freq
  58. self.max_restart_freq_time = max_restart_freq_time
  59. self.restarts_in_frame = 0
  60. def start(self):
  61. """Launches the :attr:`target` in a seperate process and starts
  62. supervising it."""
  63. target = self.target
  64. def _start_supervised_process():
  65. """Start the :attr:`target` in a new process."""
  66. process = Process(target=target,
  67. args=self.args, kwargs=self.kwargs)
  68. process.start()
  69. return process
  70. def _restart(self, process):
  71. """Terminate the process and restart."""
  72. process.join(timeout=self.join_timeout)
  73. process.terminate()
  74. self.restarts_in_frame += 1
  75. process = _start_supervised_process()
  76. try:
  77. process = _start_supervised_process()
  78. restart_frame = 0
  79. while True:
  80. if restart_frame > self.max_restart_freq_time:
  81. if self.restarts_in_frame >= self.max_restart_freq:
  82. raise Exception(
  83. "Supervised: Max restart frequency reached")
  84. restart_frame = 0
  85. self.restarts_in_frame = 0
  86. try:
  87. proc_is_alive = self.is_alive(process)
  88. except TimeoutError:
  89. proc_is_alive = False
  90. if not proc_is_alive:
  91. self._restart()
  92. time.sleep(self.check_interval)
  93. restart_frame += self.check_interval
  94. finally:
  95. process.join()
  96. def _is_alive(self, process):
  97. """Sends a ping to the target process to see if it's alive.
  98. :rtype bool:
  99. """
  100. timeout_timer = threading.Timer(self.ping_timeout, raise_ping_timeout)
  101. try:
  102. alive = process.is_alive()
  103. finally:
  104. timeout_timer.cancel()
  105. return alive