heartbeat.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. import threading
  2. from time import time, sleep
  3. class Heart(threading.Thread):
  4. """Thread sending heartbeats at regular intervals.
  5. :param eventer: Event dispatcher used to send the event.
  6. :keyword interval: Time in seconds between heartbeats.
  7. Default is 2 minutes.
  8. """
  9. #: Beats per minute.
  10. bpm = 0.5
  11. def __init__(self, eventer, interval=None):
  12. super(Heart, self).__init__()
  13. self.eventer = eventer
  14. self.bpm = interval and interval / 60.0 or self.bpm
  15. self._shutdown = threading.Event()
  16. self._stopped = threading.Event()
  17. self.setDaemon(True)
  18. self.setName(self.__class__.__name__)
  19. self._state = None
  20. def run(self):
  21. self._state = "RUN"
  22. bpm = self.bpm
  23. dispatch = self.eventer.send
  24. dispatch("worker-online")
  25. # We can't sleep all of the interval, because then
  26. # it takes 60 seconds (or value of interval) to shutdown
  27. # the thread.
  28. last_beat = None
  29. while 1:
  30. try:
  31. now = time()
  32. except TypeError:
  33. # we lost the race at interpreter shutdown,
  34. # so time has been collected by gc.
  35. return
  36. if not last_beat or now > last_beat + (60.0 / bpm):
  37. last_beat = now
  38. dispatch("worker-heartbeat")
  39. if self._shutdown.isSet():
  40. break
  41. sleep(1)
  42. try:
  43. dispatch("worker-offline")
  44. finally:
  45. self._stopped.set()
  46. def stop(self):
  47. """Gracefully shutdown the thread."""
  48. if not self._state == "RUN":
  49. return
  50. self._state = "CLOSE"
  51. self._shutdown.set()
  52. self._stopped.wait() # blocks until this thread is done
  53. if self.isAlive():
  54. self.join(1e100)