Explorar el Código

Events: Use bisect to ensure task index and worker heartbeats are sorted

Ask Solem hace 11 años
padre
commit
59ab2aa589
Se han modificado 2 ficheros con 31 adiciones y 11 borrados
  1. 28 10
      celery/events/state.py
  2. 3 1
      celery/tests/events/test_state.py

+ 28 - 10
celery/events/state.py

@@ -18,12 +18,12 @@
 """
 from __future__ import absolute_import
 
+import bisect
 import sys
 import threading
 
 from datetime import datetime
 from decimal import Decimal
-from heapq import heappush, heapreplace
 from itertools import islice
 from operator import itemgetter
 from time import time
@@ -139,13 +139,15 @@ class Worker(object):
 
     def _create_event_handler(self):
         _set = object.__setattr__
-        heartbeats = self.heartbeats
         hbmax = self.heartbeat_max
+        heartbeats = self.heartbeats
+        hb_pop = self.heartbeats.pop
+        hb_append = self.heartbeats.append
 
         def event(type_, timestamp=None,
                   local_received=None, fields=None,
                   max_drift=HEARTBEAT_DRIFT_MAX, items=items, abs=abs, int=int,
-                  heappush=heappush, heapreplace=heapreplace, len=len):
+                  insort=bisect.insort, len=len):
             fields = fields or {}
             for k, v in items(fields):
                 _set(self, k, v)
@@ -160,10 +162,13 @@ class Worker(object):
                          datetime.fromtimestamp(local_received),
                          datetime.fromtimestamp(timestamp))
                 if local_received:
-                    if len(heartbeats) > hbmax - 1:
-                        heapreplace(heartbeats, local_received)
+                    hearts = len(heartbeats)
+                    if hearts > hbmax - 1:
+                        hb_pop(0)
+                    if hearts and local_received > heartbeats[-1]:
+                        hb_append(local_received)
                     else:
-                        heappush(heartbeats, local_received)
+                        insort(heartbeats, local_received)
         return event
 
     def update(self, f, **kw):
@@ -485,6 +490,8 @@ class State(object):
         tfields = itemgetter('uuid', 'hostname', 'timestamp',
                              'local_received', 'clock')
         taskheap = self._taskheap
+        th_append = taskheap.append
+        th_pop = taskheap.pop
         # Removing events from task heap is an O(n) operation,
         # so easier to just account for the common number of events
         # for each task (PENDING->RECEIVED->STARTED->final)
@@ -498,7 +505,8 @@ class State(object):
         get_worker, get_task = workers.data.__getitem__, tasks.data.__getitem__
 
         def _event(event,
-                   timetuple=timetuple, KeyError=KeyError, created=True):
+                   timetuple=timetuple, KeyError=KeyError,
+                   insort=bisect.insort, created=True):
             self.event_count += 1
             if event_callback:
                 event_callback(self, event)
@@ -529,6 +537,7 @@ class State(object):
                         on_node_join(worker)
                     if on_node_leave and is_offline:
                         on_node_leave(worker)
+                        workers.pop(hostname, None)
                     return (worker, created), subject
             elif group == 'task':
                 (uuid, hostname, timestamp,
@@ -549,12 +558,21 @@ class State(object):
                     task.worker = worker
                     if worker is not None and local_received:
                         worker.event(None, local_received, timestamp)
+
                 origin = hostname if is_client_event else worker.id
+
+                # remove oldest event if exceeding the limit.
+                heaps = len(taskheap)
+                if heaps + 1 > max_events_in_heap:
+                    th_pop(0)
+
+                # most events will be dated later than the previous.
                 timetup = timetuple(clock, timestamp, origin, ref(task))
-                if len(taskheap) > max_events_in_heap - 1:
-                    heapreplace(taskheap, timetup)
+                if heaps and timetup > taskheap[-1]:
+                    th_append(timetup)
                 else:
-                    heappush(taskheap, timetup)
+                    insort(taskheap, timetup)
+
                 if subject == 'received':
                     self.task_count += 1
                 task.event(subject, timestamp, local_received, event)

+ 3 - 1
celery/tests/events/test_state.py

@@ -210,8 +210,10 @@ class test_Worker(AppCase):
         worker = Worker(hostname='foo')
         worker.event(None, time(), time())
         self.assertEqual(len(worker.heartbeats), 1)
+        h1 = worker.heartbeats[0]
         worker.event(None, time(), time() - 10)
-        self.assertEqual(len(worker.heartbeats), 1)
+        self.assertEqual(len(worker.heartbeats), 2)
+        self.assertEqual(worker.heartbeats[-1], h1)
 
 
 class test_Task(AppCase):