heartbeat.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import threading
  2. from time import time, sleep
  3. class Heart(threading.Thread):
  4. """Thread sending heartbeats at an interval.
  5. :param eventer: Event dispatcher used to send the event.
  6. :keyword interval: Time in seconds between heartbeats.
  7. Default is 2 minutes.
  8. .. attribute:: bpm
  9. Beats per minute.
  10. """
  11. bpm = 0.5
  12. def __init__(self, eventer, interval=None):
  13. super(Heart, self).__init__()
  14. self.eventer = eventer
  15. self.bpm = interval and interval / 60.0 or self.bpm
  16. self._shutdown = threading.Event()
  17. self._stopped = threading.Event()
  18. self.setDaemon(True)
  19. self._state = None
  20. def run(self):
  21. self._state = "RUN"
  22. bpm = self.bpm
  23. dispatch = self.eventer.send
  24. dispatch("worker-online")
  25. # We can't sleep all of the interval, because then
  26. # it takes 60 seconds (or value of interval) to shutdown
  27. # the thread.
  28. last_beat = None
  29. while 1:
  30. now = time()
  31. if not last_beat or now > last_beat + (60.0 / bpm):
  32. last_beat = now
  33. dispatch("worker-heartbeat")
  34. if self._shutdown.isSet():
  35. break
  36. sleep(1)
  37. try:
  38. dispatch("worker-offline")
  39. finally:
  40. self._stopped.set()
  41. def stop(self):
  42. """Gracefully shutdown the thread."""
  43. if not self._state == "RUN":
  44. return
  45. self._state = "CLOSE"
  46. self._shutdown.set()
  47. self._stopped.wait() # block until this thread is done
  48. self.join(1e100)