heartbeat.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # -*- coding: utf-8 -*-
  2. """Heartbeat service.
  3. This is the internal thread responsible for sending heartbeat events
  4. at regular intervals (may not be an actual thread).
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. from celery.signals import heartbeat_sent
  8. from celery.utils.sysinfo import load_average
  9. from .state import SOFTWARE_INFO, active_requests, all_total_count
  10. __all__ = ['Heart']
  11. class Heart(object):
  12. """Timer sending heartbeats at regular intervals.
  13. Arguments:
  14. timer (kombu.async.timer.Timer): Timer to use.
  15. eventer (celery.events.EventDispatcher): Event dispatcher
  16. to use.
  17. interval (float): Time in seconds between sending
  18. heartbeats. Default is 2 seconds.
  19. """
  20. def __init__(self, timer, eventer, interval=None):
  21. self.timer = timer
  22. self.eventer = eventer
  23. self.interval = float(interval or 2.0)
  24. self.tref = None
  25. # Make event dispatcher start/stop us when enabled/disabled.
  26. self.eventer.on_enabled.add(self.start)
  27. self.eventer.on_disabled.add(self.stop)
  28. # Only send heartbeat_sent signal if it has receivers.
  29. self._send_sent_signal = (
  30. heartbeat_sent.send if heartbeat_sent.receivers else None)
  31. def _send(self, event):
  32. if self._send_sent_signal is not None:
  33. self._send_sent_signal(sender=self)
  34. return self.eventer.send(event, freq=self.interval,
  35. active=len(active_requests),
  36. processed=all_total_count[0],
  37. loadavg=load_average(),
  38. **SOFTWARE_INFO)
  39. def start(self):
  40. if self.eventer.enabled:
  41. self._send('worker-online')
  42. self.tref = self.timer.call_repeatedly(
  43. self.interval, self._send, ('worker-heartbeat',),
  44. )
  45. def stop(self):
  46. if self.tref is not None:
  47. self.timer.cancel(self.tref)
  48. self.tref = None
  49. if self.eventer.enabled:
  50. self._send('worker-offline')