浏览代码

Substantial drift is now logged only once per node name (Issue #1802)

Ask Solem 11 年之前
父节点
当前提交
d85963dc2f
共有 2 个文件被更改,包括 16 次插入6 次删除
  1. 11 4
      celery/events/state.py
  2. 5 2
      celery/utils/functional.py

+ 11 - 4
celery/events/state.py

@@ -35,7 +35,7 @@ from kombu.utils import cached_property, kwdict
 from celery import states
 from celery.five import class_property, items, values
 from celery.utils import deprecated
-from celery.utils.functional import LRUCache
+from celery.utils.functional import LRUCache, memoize
 from celery.utils.log import get_logger
 
 PYPY = hasattr(sys, 'pypy_version_info')
@@ -66,6 +66,14 @@ R_TASK = '<Task: {0.name}({0.uuid}) {0.state} clock:{0.clock}>'
 __all__ = ['Worker', 'Task', 'State', 'heartbeat_expires']
 
 
+@memoize(maxsize=1000, keyfun=lambda a, _: a[0])
+def _warn_drift(hostname, drift, local_received, timestamp):
+    # we use memoize here so the warning is only logged once per hostname
+    warn(DRIFT_WARNING, hostname, drift,
+         datetime.fromtimestamp(local_received),
+         datetime.fromtimestamp(timestamp))
+
+
 def heartbeat_expires(timestamp, freq=60,
                       expire_window=HEARTBEAT_EXPIRE_WINDOW,
                       Decimal=Decimal, float=float, isinstance=isinstance):
@@ -158,9 +166,8 @@ class Worker(object):
                     return
                 drift = abs(int(local_received) - int(timestamp))
                 if drift > HEARTBEAT_DRIFT_MAX:
-                    warn(DRIFT_WARNING, self.hostname, drift,
-                         datetime.fromtimestamp(local_received),
-                         datetime.fromtimestamp(timestamp))
+                    _warn_drift(self.hostname, drift,
+                                local_received, timestamp)
                 if local_received:
                     hearts = len(heartbeats)
                     if hearts > hbmax - 1:

+ 5 - 2
celery/utils/functional.py

@@ -121,7 +121,7 @@ class LRUCache(UserDict):
             return list(self._iterate_items())
 
 
-def memoize(maxsize=None, Cache=LRUCache):
+def memoize(maxsize=None, keyfun=None, Cache=LRUCache):
 
     def _memoize(fun):
         mutex = threading.Lock()
@@ -129,7 +129,10 @@ def memoize(maxsize=None, Cache=LRUCache):
 
         @wraps(fun)
         def _M(*args, **kwargs):
-            key = args + (KEYWORD_MARK, ) + tuple(sorted(kwargs.items()))
+            if keyfun:
+                key = keyfun(args, kwargs)
+            else:
+                key = args + (KEYWORD_MARK, ) + tuple(sorted(kwargs.items()))
             try:
                 with mutex:
                     value = cache[key]