|
@@ -18,22 +18,34 @@
|
|
|
"""
|
|
|
from __future__ import absolute_import
|
|
|
|
|
|
-import heapq
|
|
|
import threading
|
|
|
|
|
|
+from heapq import heappush
|
|
|
from time import time
|
|
|
|
|
|
from kombu.utils import kwdict
|
|
|
|
|
|
from celery import states
|
|
|
from celery.datastructures import AttributeDict, LRUCache
|
|
|
+from celery.utils.log import get_logger
|
|
|
|
|
|
# The window (in percentage) is added to the workers heartbeat
|
|
|
# frequency. If the time between updates exceeds this window,
|
|
|
# then the worker is considered to be offline.
|
|
|
HEARTBEAT_EXPIRE_WINDOW = 200
|
|
|
|
|
|
-from datetime import datetime
|
|
|
+# Max drift between event timestamp and time of event received
|
|
|
+# before we alert that clocks may be unsynchronized.
|
|
|
+HEARTBEAT_DRIFT_MAX = 16
|
|
|
+
|
|
|
+DRIFT_WARNING = """\
|
|
|
+Substantial drift from %s may mean clocks are out of sync. Current drift is
|
|
|
+%s seconds (including message overhead).\
|
|
|
+"""
|
|
|
+
|
|
|
+logger = get_logger(__name__)
|
|
|
+warn = logger.warn
|
|
|
+
|
|
|
|
|
|
def heartbeat_expires(timestamp, freq=60,
|
|
|
expire_window=HEARTBEAT_EXPIRE_WINDOW):
|
|
@@ -54,26 +66,32 @@ class Worker(Element):
|
|
|
super(Worker, self).__init__(**fields)
|
|
|
self.heartbeats = []
|
|
|
|
|
|
- def on_online(self, timestamp=None, **kwargs):
|
|
|
+ def on_online(self, timestamp=None, local_received=None, **kwargs):
|
|
|
"""Callback for the :event:`worker-online` event."""
|
|
|
self.update(**kwargs)
|
|
|
- self._heartpush(timestamp)
|
|
|
+ self.update_heartbeat(local_received, timestamp)
|
|
|
|
|
|
def on_offline(self, **kwargs):
|
|
|
"""Callback for the :event:`worker-offline` event."""
|
|
|
self.update(**kwargs)
|
|
|
self.heartbeats = []
|
|
|
|
|
|
- def on_heartbeat(self, timestamp=None, **kwargs):
|
|
|
+ def on_heartbeat(self, timestamp=None, local_received=None, **kwargs):
|
|
|
"""Callback for the :event:`worker-heartbeat` event."""
|
|
|
self.update(**kwargs)
|
|
|
- self._heartpush(timestamp)
|
|
|
-
|
|
|
- def _heartpush(self, timestamp):
|
|
|
- if timestamp:
|
|
|
- heapq.heappush(self.heartbeats, timestamp)
|
|
|
- if len(self.heartbeats) > self.heartbeat_max:
|
|
|
- self.heartbeats = self.heartbeats[self.heartbeat_max:]
|
|
|
+ self.update_heartbeat(local_received, timestamp)
|
|
|
+
|
|
|
+ def update_heartbeat(self, received, timestamp):
|
|
|
+ if not received or not timestamp:
|
|
|
+ return
|
|
|
+ drift = received - timestamp
|
|
|
+ if drift > HEARTBEAT_DRIFT_MAX:
|
|
|
+ warn(DRIFT_WARNING, self.hostname, drift)
|
|
|
+ heartbeats, hbmax = self.heartbeats, self.heartbeat_max
|
|
|
+ if received and received > heartbeats[-1]:
|
|
|
+ heappush(heartbeats, received)
|
|
|
+ if len(heartbeats) > hbmax:
|
|
|
+ heartbeats[:] = heartbeats[hbmax:]
|
|
|
|
|
|
def __repr__(self):
|
|
|
return '<Worker: {0.hostname} ({0.status_string})'.format(self)
|
|
@@ -295,6 +313,7 @@ class State(object):
|
|
|
uuid = fields.pop('uuid')
|
|
|
hostname = fields.pop('hostname')
|
|
|
worker, _ = self.get_or_create_worker(hostname)
|
|
|
+ worker.update_heartbeat(fields['local_received'], fields['timestamp'])
|
|
|
task, created = self.get_or_create_task(uuid)
|
|
|
handler = getattr(task, 'on_' + type, None)
|
|
|
if type == 'received':
|