Quellcode durchsuchen

Optimize loading/storing revoked tasks. Closes #1289

Ask Solem vor 12 Jahren
Ursprung
Commit
bb90a2872a
2 geänderte Dateien mit 59 neuen und 22 gelöschten Zeilen
  1. 50 18
      celery/datastructures.py
  2. 9 4
      celery/worker/state.py

+ 50 - 18
celery/datastructures.py

@@ -13,6 +13,7 @@ import sys
 import time
 
 from collections import defaultdict
+from heapq import heapify, heappush, heappop
 from itertools import chain
 
 try:
@@ -407,45 +408,76 @@ class LimitedSet(object):
     :keyword expires: Time in seconds, before a membership expires.
 
     """
-    __slots__ = ('maxlen', 'expires', '_data', '__len__')
+    __slots__ = ('maxlen', 'expires', '_data', '__len__', '_heap')
 
-    def __init__(self, maxlen=None, expires=None):
+    def __init__(self, maxlen=None, expires=None, data=None, heap=None):
         self.maxlen = maxlen
         self.expires = expires
-        self._data = {}
+        self._data = data or {}
+        self._heap = heap or []
         self.__len__ = self._data.__len__
 
     def add(self, value):
         """Add a new member."""
-        self._expire_item()
-        self._data[value] = time.time()
+        self.purge(1)
+        now = time.time()
+        self._data[value] = now
+        heappush(self._heap, (now, value))
+
+    def __reduce__(self):
+        return self.__class__, (
+            self.maxlen, self.expires, self._data, self._heap,
+        )
 
     def clear(self):
         """Remove all members"""
         self._data.clear()
+        self._heap[:] = []
 
     def pop_value(self, value):
         """Remove membership by finding value."""
+        try:
+            itime = self._data[value]
+        except KeyError:
+            return
+        try:
+            self._heap.remove((value, itime))
+        except ValueError:
+            pass
         self._data.pop(value, None)
 
     def _expire_item(self):
         """Hunt down and remove an expired item."""
-        while 1:
-            if self.maxlen and len(self) >= self.maxlen:
-                value, when = self.first
-                if not self.expires or time.time() > when + self.expires:
-                    try:
-                        self.pop_value(value)
-                    except TypeError:  # pragma: no cover
-                        continue
-            break
+        self.purge(1)
 
     def __contains__(self, value):
         return value in self._data
 
-    def update(self, other):
+    def purge(self, limit=None):
+        H, maxlen = self._heap, self.maxlen
+        if not maxlen:
+            return
+        i = 0
+        while len(self) >= maxlen:
+            if limit and i > limit:
+                break
+            try:
+                item = heappop(H)
+            except IndexError:
+                break
+            if self.expires:
+                if time.time() < item[0] + self.expires:
+                    heappush(H, item)
+                    break
+            self._data.pop(item[1])
+            i += 1
+
+
+    def update(self, other, heappush=heappush):
         if isinstance(other, self.__class__):
             self._data.update(other._data)
+            self._heap.extend(other._heap)
+            heapify(self._heap)
         else:
             for obj in other:
                 self.add(obj)
@@ -457,13 +489,13 @@ class LimitedSet(object):
         return iter(self._data)
 
     def __repr__(self):
-        return 'LimitedSet(%r)' % (list(self._data), )
+        return 'LimitedSet(%s)' % (repr(list(self._data))[:100], )
 
     @property
     def chronologically(self):
-        return sorted(self._data.items(), key=lambda (value, when): when)
+        return [value for _, value in self._heap]
 
     @property
     def first(self):
         """Get the oldest member."""
-        return self.chronologically[0]
+        return self._heap[0][1]

+ 9 - 4
celery/worker/state.py

@@ -144,13 +144,18 @@ class Persistent(object):
         self.close()
 
     def merge(self, d):
-        revoked.update(d.get('revoked') or {})
+        saved = d.get('revoked') or LimitedSet()
+        if isinstance(saved, LimitedSet):
+            revoked.update(saved)
+        else:
+            # (pre 3.0.18) used to be stored as dict
+            for item in saved:
+                revoked.add(item)
         return d
 
     def sync(self, d):
-        prev = d.get('revoked') or {}
-        prev.update(revoked.as_dict())
-        d['revoked'] = prev
+        revoked.purge()
+        d['revoked'] = revoked
         return d
 
     def open(self):