heartbeat.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.heartbeat
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. This is the internal thread that sends heartbeat events
  6. at regular intervals.
  7. """
  8. from __future__ import absolute_import
  9. from .state import SOFTWARE_INFO
  10. class Heart(object):
  11. """Timer sending heartbeats at regular intervals.
  12. :param timer: Timer instance.
  13. :param eventer: Event dispatcher used to send the event.
  14. :keyword interval: Time in seconds between heartbeats.
  15. Default is 30 seconds.
  16. """
  17. def __init__(self, timer, eventer, interval=None):
  18. self.timer = timer
  19. self.eventer = eventer
  20. self.interval = float(interval or 5.0)
  21. self.tref = None
  22. # Make event dispatcher start/stop us when it's
  23. # enabled/disabled.
  24. self.eventer.on_enabled.add(self.start)
  25. self.eventer.on_disabled.add(self.stop)
  26. def _send(self, event):
  27. return self.eventer.send(event, freq=self.interval, **SOFTWARE_INFO)
  28. def start(self):
  29. if self.eventer.enabled:
  30. self._send('worker-online')
  31. self.tref = self.timer.apply_interval(self.interval * 1000.0,
  32. self._send, ('worker-heartbeat', ))
  33. def stop(self):
  34. if self.tref is not None:
  35. self.timer.cancel(self.tref)
  36. self.tref = None
  37. if self.eventer.enabled:
  38. self._send('worker-offline')