Browse Source

Revokes: Make sure heap does not contain duplicates. Closes #2336

Ask Solem 10 years ago
parent
commit
289ec8e003
1 changed files with 18 additions and 14 deletions
  1. 18 14
      celery/datastructures.py

+ 18 - 14
celery/datastructures.py

@@ -12,7 +12,7 @@ import sys
 import time
 
 from collections import defaultdict, Mapping, MutableMapping, MutableSet
-from heapq import heappush, heappop
+from heapq import heapify, heappush, heappop
 from functools import partial
 from itertools import chain
 
@@ -555,7 +555,7 @@ class LimitedSet(object):
     """Kind-of Set with limitations.
 
     Good for when you need to test for membership (`a in set`),
-    but the list might become too big.
+    but the set should not grow unbounded.
 
     :keyword maxlen: Maximum number of members before we start
                      evicting expired members.
@@ -564,24 +564,31 @@ class LimitedSet(object):
     """
 
     def __init__(self, maxlen=None, expires=None, data=None, heap=None):
+        # heap is ignored
         self.maxlen = maxlen
         self.expires = expires
         self._data = {} if data is None else data
-        self._heap = [] if heap is None else heap
+        self._heap = []
+
         # make shortcuts
         self.__len__ = self._heap.__len__
-        self.__iter__ = self._heap.__iter__
         self.__contains__ = self._data.__contains__
 
-    def add(self, value, now=time.time):
+        self._refresh_heap()
+
+    def _refresh_heap(self):
+        self._heap[:] = [(t, key) for key, t in items(self._data)]
+        heapify(self._heap)
+
+    def add(self, key, now=time.time, heappush=heappush):
         """Add a new member."""
         # offset is there to modify the length of the list,
         # this way we can expire an item before inserting the value,
-        # and it will end up in correct order.
+        # and it will end up in the correct order.
         self.purge(1, offset=1)
         inserted = now()
-        self._data[value] = inserted
-        heappush(self._heap, (inserted, value))
+        self._data[key] = inserted
+        heappush(self._heap, (inserted, key))
 
     def clear(self):
         """Remove all members"""
@@ -630,11 +637,10 @@ class LimitedSet(object):
                 pass
             i += 1
 
-    def update(self, other, heappush=heappush):
+    def update(self, other):
         if isinstance(other, LimitedSet):
             self._data.update(other._data)
-            self._heap.extend(other._heap)
-            self._heap.sort()
+            self._refresh_heap()
         else:
             for obj in other:
                 self.add(obj)
@@ -661,7 +667,5 @@ class LimitedSet(object):
         return key in self._data
 
     def __reduce__(self):
-        return self.__class__, (
-            self.maxlen, self.expires, self._data, self._heap,
-        )
+        return self.__class__, (self.maxlen, self.expires, self._data)
 MutableSet.register(LimitedSet)