supervisor.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import multiprocessing
  2. import time
  3. from multiprocessing import TimeoutError
  4. JOIN_TIMEOUT = 2
  5. CHECK_INTERVAL = 2
  6. MAX_RESTART_FREQ = 3
  7. MAX_RESTART_FREQ_TIME = 10
  8. class MaxRestartsExceededError(Exception):
  9. """Restarts exceeded the maximum restart frequency."""
  10. class OFASupervisor(object):
  11. """Process supervisor using the `one_for_all`_ strategy.
  12. .. _`one_for_all`:
  13. http://erlang.org/doc/design_principles/sup_princ.html#5.3.2
  14. However, instead of registering a list of processes, you have one
  15. process which runs a pool. Makes for an easy implementation.
  16. :param target: see :attr:`target`.
  17. :param args: see :attr:`args`.
  18. :param kwargs: see :attr:`kwargs`.
  19. :param max_restart_freq: see :attr:`max_restart_freq`.
  20. :param max_restart_freq_time: see :attr:`max_restart_freq_time`.
  21. :param check_interval: see :attr:`max_restart_freq_time`.
  22. .. attribute:: target
  23. The target callable to be launched in a new process.
  24. .. attribute:: args
  25. The positional arguments to apply to :attr:`target`.
  26. .. attribute:: kwargs
  27. The keyword arguments to apply to :attr:`target`.
  28. .. attribute:: max_restart_freq
  29. Limit the number of restarts which can occur in a given time interval.
  30. The max restart frequency is the number of restarts that can occur
  31. within the interval :attr:`max_restart_freq_time`.
  32. The restart mechanism prevents situations where the process repeatedly
  33. dies for the same reason. If this happens both the process and the
  34. supervisor is terminated.
  35. .. attribute:: max_restart_freq_time
  36. See :attr:`max_restart_freq`.
  37. .. attribute:: check_interval
  38. The time in seconds, between process pings.
  39. """
  40. Process = multiprocessing.Process
  41. def __init__(self, target, args=None, kwargs=None,
  42. max_restart_freq=MAX_RESTART_FREQ,
  43. join_timeout=JOIN_TIMEOUT,
  44. max_restart_freq_time=MAX_RESTART_FREQ_TIME,
  45. check_interval=CHECK_INTERVAL):
  46. self.target = target
  47. self.join_timeout = join_timeout
  48. self.args = args or []
  49. self.kwargs = kwargs or {}
  50. self.check_interval = check_interval
  51. self.max_restart_freq = max_restart_freq
  52. self.max_restart_freq_time = max_restart_freq_time
  53. self.restarts_in_frame = 0
  54. def start(self):
  55. """Launches the :attr:`target` in a seperate process and starts
  56. supervising it."""
  57. target = self.target
  58. def _start_supervised_process():
  59. """Start the :attr:`target` in a new process."""
  60. process = self.Process(target=target,
  61. args=self.args, kwargs=self.kwargs)
  62. process.start()
  63. return process
  64. def _restart(process):
  65. """Terminate the process and restart."""
  66. process.join(timeout=self.join_timeout)
  67. process.terminate()
  68. self.restarts_in_frame += 1
  69. process = _start_supervised_process()
  70. process = _start_supervised_process()
  71. try:
  72. restart_frame = 0
  73. while True:
  74. if restart_frame > self.max_restart_freq_time:
  75. if self.restarts_in_frame >= self.max_restart_freq:
  76. raise MaxRestartsExceededError(
  77. "Supervised: Max restart frequency reached")
  78. restart_frame = 0
  79. self.restarts_in_frame = 0
  80. try:
  81. proc_is_alive = process.is_alive()
  82. except TimeoutError:
  83. proc_is_alive = False
  84. if not proc_is_alive:
  85. _restart(process)
  86. time.sleep(self.check_interval)
  87. restart_frame += self.check_interval
  88. finally:
  89. process.join()