heartbeat.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. # -*- coding: utf-8 -*-
  2. """This is the internal thread responsible for sending heartbeat events
  3. at regular intervals (may not be an actual thread)."""
  4. from __future__ import absolute_import, unicode_literals
  5. from celery.signals import heartbeat_sent
  6. from celery.utils.sysinfo import load_average
  7. from .state import SOFTWARE_INFO, active_requests, all_total_count
  8. __all__ = ['Heart']
  9. class Heart:
  10. """Timer sending heartbeats at regular intervals.
  11. Arguments:
  12. timer (kombu.async.timer.Timer): Timer to use.
  13. eventer (celery.events.EventDispatcher): Event dispatcher
  14. to use.
  15. interval (float): Time in seconds between sending
  16. heartbeats. Default is 2 seconds.
  17. """
  18. def __init__(self, timer, eventer, interval=None):
  19. self.timer = timer
  20. self.eventer = eventer
  21. self.interval = float(interval or 2.0)
  22. self.tref = None
  23. # Make event dispatcher start/stop us when enabled/disabled.
  24. self.eventer.on_enabled.add(self.start)
  25. self.eventer.on_disabled.add(self.stop)
  26. # Only send heartbeat_sent signal if it has receivers.
  27. self._send_sent_signal = (
  28. heartbeat_sent.send if heartbeat_sent.receivers else None)
  29. def _send(self, event):
  30. if self._send_sent_signal is not None:
  31. self._send_sent_signal(sender=self)
  32. return self.eventer.send(event, freq=self.interval,
  33. active=len(active_requests),
  34. processed=all_total_count[0],
  35. loadavg=load_average(),
  36. **SOFTWARE_INFO)
  37. def start(self):
  38. if self.eventer.enabled:
  39. self._send('worker-online')
  40. self.tref = self.timer.call_repeatedly(
  41. self.interval, self._send, ('worker-heartbeat',),
  42. )
  43. def stop(self):
  44. if self.tref is not None:
  45. self.timer.cancel(self.tref)
  46. self.tref = None
  47. if self.eventer.enabled:
  48. self._send('worker-offline')