supervisor.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import multiprocessing
  2. import threading
  3. import time
  4. from multiprocessing import TimeoutError
  5. PING_TIMEOUT = 30 # seconds
  6. JOIN_TIMEOUT = 2
  7. CHECK_INTERVAL = 2
  8. MAX_RESTART_FREQ = 3
  9. MAX_RESTART_FREQ_TIME = 10
  10. class MaxRestartsExceededError(Exception):
  11. """Restarts exceeded the maximum restart frequency."""
  12. def raise_ping_timeout(msg):
  13. """Raises :exc:`multiprocessing.TimeoutError`, for use in
  14. :class:`threading.Timer` callbacks."""
  15. raise TimeoutError("Supervised: Timed out while pinging process.")
  16. class OFASupervisor(object):
  17. """Process supervisor using the `one_for_all`_ strategy.
  18. .. _`one_for_all`:
  19. http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
  20. However, instead of registering a list of processes, you have one
  21. process which runs a pool. Makes for an easy implementation.
  22. :param target: see :attr:`target`.
  23. :param args: see :attr:`args`.
  24. :param kwargs: see :attr:`kwargs`.
  25. :param join_timeout: see :attr:`join_timeout`.
  26. :param max_restart_freq: see :attr:`max_restart_freq`.
  27. :param max_restart_freq_time: see :attr:`max_restart_freq_time`.
  28. :param check_interval: see :attr:`max_restart_freq_time`.
  29. .. attribute:: target
  30. The target callable to be launched in a new process.
  31. .. attribute:: args
  32. The positional arguments to apply to :attr:`target`.
  33. .. attribute:: kwargs
  34. The keyword arguments to apply to :attr:`target`.
  35. .. attribute:: join_timeout
  36. If the process is dead, try to give it a few seconds to join.
  37. .. attribute:: max_restart_freq
  38. Limit the number of restarts which can occur in a given time interval.
  39. The max restart frequency is the number of restarts that can occur
  40. within the interval :attr:`max_restart_freq_time`.
  41. The restart mechanism prevents situations where the process repeatedly
  42. dies for the same reason. If this happens both the process and the
  43. supervisor is terminated.
  44. .. attribute:: max_restart_freq_time
  45. See :attr:`max_restart_freq`.
  46. .. attribute:: check_interval
  47. The time in seconds, between process pings.
  48. """
  49. Process = multiprocessing.Process
  50. def __init__(self, target, args=None, kwargs=None,
  51. ping_timeout=PING_TIMEOUT, join_timeout=JOIN_TIMEOUT,
  52. max_restart_freq = MAX_RESTART_FREQ,
  53. max_restart_freq_time=MAX_RESTART_FREQ_TIME,
  54. check_interval=CHECK_INTERVAL):
  55. self.target = target
  56. self.args = args or []
  57. self.kwargs = kwargs or {}
  58. self.ping_timeout = ping_timeout
  59. self.join_timeout = join_timeout
  60. self.check_interval = check_interval
  61. self.max_restart_freq = max_restart_freq
  62. self.max_restart_freq_time = max_restart_freq_time
  63. self.restarts_in_frame = 0
  64. def start(self):
  65. """Launches the :attr:`target` in a seperate process and starts
  66. supervising it."""
  67. target = self.target
  68. def _start_supervised_process():
  69. """Start the :attr:`target` in a new process."""
  70. process = self.Process(target=target,
  71. args=self.args, kwargs=self.kwargs)
  72. process.start()
  73. return process
  74. def _restart(process):
  75. """Terminate the process and restart."""
  76. process.join(timeout=self.join_timeout)
  77. process.terminate()
  78. self.restarts_in_frame += 1
  79. process = _start_supervised_process()
  80. process = _start_supervised_process()
  81. try:
  82. restart_frame = 0
  83. while True:
  84. if restart_frame > self.max_restart_freq_time:
  85. if self.restarts_in_frame >= self.max_restart_freq:
  86. raise MaxRestartsExceededError(
  87. "Supervised: Max restart frequency reached")
  88. restart_frame = 0
  89. self.restarts_in_frame = 0
  90. try:
  91. proc_is_alive = self._is_alive(process)
  92. except TimeoutError:
  93. proc_is_alive = False
  94. if not proc_is_alive:
  95. _restart(process)
  96. time.sleep(self.check_interval)
  97. restart_frame += self.check_interval
  98. finally:
  99. process.join()
  100. def _is_alive(self, process):
  101. """Sends a ping to the target process to see if it's alive.
  102. :rtype bool:
  103. """
  104. timeout_timer = threading.Timer(self.ping_timeout, raise_ping_timeout)
  105. try:
  106. alive = process.is_alive()
  107. finally:
  108. timeout_timer.cancel()
  109. return alive