Sfoglia il codice sorgente

Heartbeat frequency now every 5s, and frequency sent with event

The heartbeat frequency is now available in the worker event messages,
so that clients can decide when to consider workers offline based on
the value.
Ask Solem 13 anni fa
parent
commit
387cf1c137

+ 12 - 7
celery/events/state.py

@@ -22,20 +22,20 @@
 from __future__ import absolute_import
 from __future__ import absolute_import
 from __future__ import with_statement
 from __future__ import with_statement
 
 
-import time
 import heapq
 import heapq
 
 
 from threading import Lock
 from threading import Lock
+from time import time
 
 
 from kombu.utils import kwdict
 from kombu.utils import kwdict
 
 
 from .. import states
 from .. import states
 from ..datastructures import AttributeDict, LRUCache
 from ..datastructures import AttributeDict, LRUCache
 
 
-#: Hartbeat expiry time in seconds.  The worker will be considered offline
-#: if no heartbeat is received within this time.
-#: Default is 2:30 minutes.
-HEARTBEAT_EXPIRE = 150
+# The window (in percentage) is added to the workers heartbeat
+# frequency.  If the time between updates exceeds this window,
+# then the worker is considered to be offline.
+HEARTBEAT_EXPIRE_WINDOW = 200
 
 
 
 
 class Element(AttributeDict):
 class Element(AttributeDict):
@@ -45,6 +45,8 @@ class Element(AttributeDict):
 class Worker(Element):
 class Worker(Element):
     """Worker State."""
     """Worker State."""
     heartbeat_max = 4
     heartbeat_max = 4
+    expire_window = HEARTBEAT_EXPIRE_WINDOW
+    freq = 60  # default frequency for workers < 2.6
 
 
     def __init__(self, **fields):
     def __init__(self, **fields):
         super(Worker, self).__init__(**fields)
         super(Worker, self).__init__(**fields)
@@ -72,10 +74,13 @@ class Worker(Element):
         return "<Worker: %s (%s)" % (self.hostname,
         return "<Worker: %s (%s)" % (self.hostname,
                                      self.alive and "ONLINE" or "OFFLINE")
                                      self.alive and "ONLINE" or "OFFLINE")
 
 
+    @property
+    def heartbeat_expires(self):
+        return self.heartbeats[-1] + self.freq * (self.expire_window / 1e2)
+
     @property
     @property
     def alive(self):
     def alive(self):
-        return (self.heartbeats and
-                time.time() < self.heartbeats[-1] + HEARTBEAT_EXPIRE)
+        return (self.heartbeats and time() < self.heartbeat_expires)
 
 
 
 
 class Task(Element):
 class Task(Element):

+ 2 - 2
celery/tests/test_events/test_events_state.py

@@ -6,7 +6,7 @@ from itertools import count
 
 
 from celery import states
 from celery import states
 from celery.events import Event
 from celery.events import Event
-from celery.events.state import State, Worker, Task, HEARTBEAT_EXPIRE
+from celery.events.state import State, Worker, Task, HEARTBEAT_EXPIRE_WINDOW
 from celery.utils import uuid
 from celery.utils import uuid
 from celery.tests.utils import Case
 from celery.tests.utils import Case
 
 
@@ -54,7 +54,7 @@ class ev_worker_heartbeats(replay):
     def setup(self):
     def setup(self):
         self.events = [
         self.events = [
             Event("worker-heartbeat", hostname="utest1",
             Event("worker-heartbeat", hostname="utest1",
-                timestamp=time() - HEARTBEAT_EXPIRE * 2),
+                timestamp=time() - HEARTBEAT_EXPIRE_WINDOW * 2),
             Event("worker-heartbeat", hostname="utest1"),
             Event("worker-heartbeat", hostname="utest1"),
         ]
         ]
 
 

+ 2 - 2
celery/worker/heartbeat.py

@@ -28,7 +28,7 @@ class Heart(object):
     def __init__(self, timer, eventer, interval=None):
     def __init__(self, timer, eventer, interval=None):
         self.timer = timer
         self.timer = timer
         self.eventer = eventer
         self.eventer = eventer
-        self.interval = interval or 30
+        self.interval = float(interval or 5.0)
         self.tref = None
         self.tref = None
 
 
         # Make event dispatcher start/stop us when it's
         # Make event dispatcher start/stop us when it's
@@ -37,7 +37,7 @@ class Heart(object):
         self.eventer.on_disabled.add(self.stop)
         self.eventer.on_disabled.add(self.stop)
 
 
     def _send(self, event):
     def _send(self, event):
-        return self.eventer.send(event, **SOFTWARE_INFO)
+        return self.eventer.send(event, freq=self.interval, **SOFTWARE_INFO)
 
 
     def start(self):
     def start(self):
         if self.eventer.enabled:
         if self.eventer.enabled:

+ 6 - 3
docs/userguide/monitoring.rst

@@ -580,19 +580,22 @@ Task Events
 Worker Events
 Worker Events
 ~~~~~~~~~~~~~
 ~~~~~~~~~~~~~
 
 
-* ``worker-online(hostname, timestamp, sw_ident, sw_ver, sw_sys)``
+* ``worker-online(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
 
 
     The worker has connected to the broker and is online.
     The worker has connected to the broker and is online.
 
 
+    * `hostname`: Hostname of the worker.
+    * `timestamp`: Event timestamp.
+    * `freq`: Heartbeat frequency in seconds (float).
     * `sw_ident`: Name of worker software (e.g. celeryd).
     * `sw_ident`: Name of worker software (e.g. celeryd).
     * `sw_ver`: Software version (e.g. 2.2.0).
     * `sw_ver`: Software version (e.g. 2.2.0).
     * `sw_sys`: Operating System (e.g. Linux, Windows, Darwin).
     * `sw_sys`: Operating System (e.g. Linux, Windows, Darwin).
 
 
-* ``worker-heartbeat(hostname, timestamp, sw_ident, sw_ver, sw_sys)``
+* ``worker-heartbeat(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
 
 
     Sent every minute, if the worker has not sent a heartbeat in 2 minutes,
     Sent every minute, if the worker has not sent a heartbeat in 2 minutes,
     it is considered to be offline.
     it is considered to be offline.
 
 
-* ``worker-offline(hostname, timestamp, sw_ident, sw_ver, sw_sys)``
+* ``worker-offline(hostname, timestamp, freq, sw_ident, sw_ver, sw_sys)``
 
 
     The worker has disconnected from the broker.
     The worker has disconnected from the broker.