12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- # -*- coding: utf-8 -*-
- """Heartbeat service.
- This is the internal thread responsible for sending heartbeat events
- at regular intervals (may not be an actual thread).
- """
- from __future__ import absolute_import, unicode_literals
- from celery.signals import heartbeat_sent
- from celery.utils.sysinfo import load_average
- from .state import SOFTWARE_INFO, active_requests, all_total_count
- __all__ = ['Heart']
- class Heart(object):
- """Timer sending heartbeats at regular intervals.
- Arguments:
- timer (kombu.async.timer.Timer): Timer to use.
- eventer (celery.events.EventDispatcher): Event dispatcher
- to use.
- interval (float): Time in seconds between sending
- heartbeats. Default is 2 seconds.
- """
- def __init__(self, timer, eventer, interval=None):
- self.timer = timer
- self.eventer = eventer
- self.interval = float(interval or 2.0)
- self.tref = None
- # Make event dispatcher start/stop us when enabled/disabled.
- self.eventer.on_enabled.add(self.start)
- self.eventer.on_disabled.add(self.stop)
- # Only send heartbeat_sent signal if it has receivers.
- self._send_sent_signal = (
- heartbeat_sent.send if heartbeat_sent.receivers else None)
- def _send(self, event):
- if self._send_sent_signal is not None:
- self._send_sent_signal(sender=self)
- return self.eventer.send(event, freq=self.interval,
- active=len(active_requests),
- processed=all_total_count[0],
- loadavg=load_average(),
- **SOFTWARE_INFO)
- def start(self):
- if self.eventer.enabled:
- self._send('worker-online')
- self.tref = self.timer.call_repeatedly(
- self.interval, self._send, ('worker-heartbeat',),
- )
- def stop(self):
- if self.tref is not None:
- self.timer.cancel(self.tref)
- self.tref = None
- if self.eventer.enabled:
- self._send('worker-offline')
|