heartbeat.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import threading
  2. from time import time, sleep
  3. class Heart(threading.Thread):
  4. """Thread sending heartbeats at an interval.
  5. :param eventer: Event dispatcher used to send the event.
  6. :keyword interval: Time in seconds between heartbeats.
  7. Default is 2 minutes.
  8. .. attribute:: bpm
  9. Beats per minute.
  10. """
  11. bpm = 0.5
  12. def __init__(self, eventer, interval=None):
  13. super(Heart, self).__init__()
  14. self.eventer = eventer
  15. self.bpm = interval and interval / 60.0 or self.bpm
  16. self._shutdown = threading.Event()
  17. self._stopped = threading.Event()
  18. self.setDaemon(True)
  19. self.setName(self.__class__.__name__)
  20. self._state = None
  21. def run(self):
  22. self._state = "RUN"
  23. bpm = self.bpm
  24. dispatch = self.eventer.send
  25. dispatch("worker-online")
  26. # We can't sleep all of the interval, because then
  27. # it takes 60 seconds (or value of interval) to shutdown
  28. # the thread.
  29. last_beat = None
  30. while 1:
  31. try:
  32. now = time()
  33. except TypeError:
  34. # we lost the race at interpreter shutdown,
  35. # so time has been collected by gc.
  36. return
  37. if not last_beat or now > last_beat + (60.0 / bpm):
  38. last_beat = now
  39. dispatch("worker-heartbeat")
  40. if self._shutdown.isSet():
  41. break
  42. sleep(1)
  43. try:
  44. dispatch("worker-offline")
  45. finally:
  46. self._stopped.set()
  47. def stop(self):
  48. """Gracefully shutdown the thread."""
  49. if not self._state == "RUN":
  50. return
  51. self._state = "CLOSE"
  52. self._shutdown.set()
  53. self._stopped.wait() # block until this thread is done
  54. if self.isAlive():
  55. self.join(1e100)