Explorar o código

Events: Heappop does not keep the underlying list in order

Ask Solem %!s(int64=11) %!d(string=hai) anos
pai
achega
89531f6b4c
Modificáronse 1 ficheiros con 18 adicións e 15 borrados
  1. 18 15
      celery/events/state.py

+ 18 - 15
celery/events/state.py

@@ -23,7 +23,7 @@ import threading
 
 from datetime import datetime
 from decimal import Decimal
-from heapq import heapify, heappush, heappop
+from heapq import heappush, heapreplace
 from itertools import islice
 from operator import itemgetter
 from time import time
@@ -144,8 +144,8 @@ class Worker(object):
 
         def event(type_, timestamp=None,
                   local_received=None, fields=None,
-                  max_drift=HEARTBEAT_DRIFT_MAX, items=items, abs=abs,
-                  heappush=heappush, heappop=heappop, int=int, len=len):
+                  max_drift=HEARTBEAT_DRIFT_MAX, items=items, abs=abs, int=int,
+                  heappush=heappush, heapreplace=heapreplace, len=len):
             fields = fields or {}
             for k, v in items(fields):
                 _set(self, k, v)
@@ -159,11 +159,11 @@ class Worker(object):
                     warn(DRIFT_WARNING, self.hostname, drift,
                          datetime.fromtimestamp(local_received),
                          datetime.fromtimestamp(timestamp))
-                if not heartbeats or (
-                        local_received and local_received > heartbeats[-1]):
-                    heappush(heartbeats, local_received)
-                    if len(heartbeats) > hbmax:
-                        heappop(heartbeats)
+                if local_received:
+                    if len(heartbeats) > hbmax - 1:
+                        heapreplace(heartbeats, local_received)
+                    else:
+                        heappush(heartbeats, local_received)
         return event
 
     def update(self, f, **kw):
@@ -184,7 +184,9 @@ class Worker(object):
 
     @property
     def alive(self, nowfun=time):
-        return bool(self.heartbeats and nowfun() < self.heartbeat_expires)
+        now = nowfun()
+        expires = self.heartbeat_expires
+        return bool(self.heartbeats and now < expires)
 
     @property
     def id(self):
@@ -550,10 +552,11 @@ class State(object):
                     if worker is not None and local_received:
                         worker.event(None, local_received, timestamp)
                 origin = hostname if is_client_event else worker.id
-                heappush(taskheap,
-                         timetuple(clock, timestamp, origin, ref(task)))
-                if len(taskheap) > max_events_in_heap:
-                    heappop(taskheap)
+                timetup = timetuple(clock, timestamp, origin, ref(task))
+                if len(taskheap) > max_events_in_heap - 1:
+                    heapreplace(taskheap, timetup)
+                else:
+                    heappush(taskheap, timetup)
                 if subject == 'received':
                     self.task_count += 1
                 task.event(subject, timestamp, local_received, event)
@@ -563,12 +566,12 @@ class State(object):
                 return (task, created), subject
         return _event
 
-    def rebuild_taskheap(self, timetuple=timetuple, heapify=heapify):
+    def rebuild_taskheap(self, timetuple=timetuple):
         heap = self._taskheap[:] = [
             timetuple(t.clock, t.timestamp, t.origin, ref(t))
             for t in values(self.tasks)
         ]
-        heapify(heap)
+        heap.sort()
 
     def itertasks(self, limit=None):
         for index, row in enumerate(items(self.tasks)):