supervisor.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from multiprocessing import Process, TimeoutError
  2. import threading
  3. import time
  4. PING_TIMEOUT = 30 # seconds
  5. JOIN_TIMEOUT = 2
  6. CHECK_INTERVAL = 2
  7. MAX_RESTART_FREQ = 3
  8. MAX_RESTART_FREQ_TIME = 10
  9. def raise_ping_timeout():
  10. raise TimeoutError("Supervised: Timed out while pinging process.")
  11. class OFASupervisor(object):
  12. """Process supervisor using the `one_for_all`_ strategy.
  13. .. _`one_for_all`:
  14. http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
  15. However, instead of registering a list of processes, you have one
  16. process which runs a pool. Makes for an easy implementation.
  17. :param target: see :attr:`target`.
  18. :param args: see :attr:`args`.
  19. :param kwargs see :attr:`kwargs`.
  20. :param join_timeout see :attr:`join_timeout`.
  21. :param max_restart_freq see :attr:`max_restart_freq`.
  22. :param max_restart_freq_time see :attr:`max_restart_freq_time`.
  23. :param check_interval see :attr:`max_restart_freq_time`.
  24. .. attribute:: target
  25. The target callable to be launched in a new process.
  26. .. attribute:: args
  27. The positional arguments to apply to :attr:`target`.
  28. .. attribute:: kwargs
  29. The keyword arguments to apply to :attr:`target`.
  30. .. attribute:: join_timeout
  31. If the process is dead, try to give it a few seconds to join.
  32. .. attribute:: max_restart_freq
  33. Limit the number of restarts which can occur in a given time interval.
  34. The max restart frequency is the number of restarts that can occur
  35. within the interval :attr:`max_restart_freq_time`.
  36. The restart mechanism prevents situations where the process repeatedly
  37. dies for the same reason. If this happens both the process and the
  38. supervisor is terminated.
  39. .. attribute:: max_restart_freq_time
  40. See :attr:`max_restart_freq`.
  41. .. attribute:: check_interval
  42. The time in seconds, between process pings.
  43. """
  44. def __init__(self, target, args=None, kwargs=None,
  45. ping_timeout=PING_TIMEOUT, join_timeout=JOIN_TIMEOUT,
  46. max_restart_freq = MAX_RESTART_FREQ,
  47. max_restart_freq_time=MAX_RESTART_FREQ_TIME,
  48. check_interval=CHECK_INTERVAL):
  49. self.target = target
  50. self.args = args or []
  51. self.kwargs = kwargs or {}
  52. self.ping_timeout = ping_timeout
  53. self.join_timeout = join_timeout
  54. self.check_interval = check_interval
  55. self.max_restart_freq = max_restart_freq
  56. self.max_restart_freq_time = max_restart_freq_time
  57. self.restarts_in_frame = 0
  58. def start(self):
  59. """Launches the :attr:`target` in a seperate process and starts
  60. supervising it."""
  61. target = self.target
  62. def _start_supervised_process():
  63. """Start the :attr:`target` in a new process."""
  64. process = Process(target=target,
  65. args=self.args, kwargs=self.kwargs)
  66. process.start()
  67. return process
  68. def _restart(self, process):
  69. """Terminate the process and restart."""
  70. process.join(timeout=self.join_timeout)
  71. process.terminate()
  72. self.restarts_in_frame += 1
  73. process = _start_supervised_process()
  74. try:
  75. process = _start_supervised_process()
  76. restart_frame = 0
  77. while True:
  78. if restart_frame > self.max_restart_freq_time:
  79. if self.restarts_in_frame >= self.max_restart_freq:
  80. raise Exception(
  81. "Supervised: Max restart frequency reached")
  82. restart_frame = 0
  83. self.restarts_in_frame = 0
  84. try:
  85. proc_is_alive = self.is_alive(process)
  86. except TimeoutError:
  87. proc_is_alive = False
  88. if not proc_is_alive:
  89. self._restart()
  90. time.sleep(self.check_interval)
  91. restart_frame += self.check_interval
  92. finally:
  93. process.join()
  94. def _is_alive(self, process):
  95. """Sends a ping to the target process to see if it's alive.
  96. :rtype bool:
  97. """
  98. timeout_timer = threading.Timer(self.ping_timeout, raise_ping_timeout)
  99. try:
  100. alive = process.is_alive()
  101. finally:
  102. timeout_timer.cancel()
  103. return alive