Parcourir la source

Improvements and fixes for LimitedSet

Getting rid of leaking memory + adding minlen size of the set
minlen is minimal residual size of set after operating for long.
Minlen items are kept, even if they should be expired by time, until
we get newer items.

Problems with older and even more old code:

1)
   Heap would tend to grow in some scenarios
   (like adding an item multiple times).

2) Adding many items fast would not clean them soon enough (if ever).

3) When talking to other workers, revoked._data was sent, but
   it was processed on the other side as iterable.
   That means giving those keys new (current)
   timestamp. By doing this workers could recycle
   items forever. Combined with 1) and 2), this means that in
   large set of workers, you are getting out of memory soon.

All those problems should be fixed now,
also some new unittests are added.

This should fix issues #3095, #3086.
David Pravec il y a 9 ans
Parent
commit
e0221e9f69
2 fichiers modifiés avec 245 ajouts et 95 suppressions
  1. 167 70
      celery/datastructures.py
  2. 78 25
      celery/tests/utils/test_datastructures.py

+ 167 - 70
celery/datastructures.py

@@ -583,120 +583,217 @@ MutableMapping.register(ConfigurationView)
 
 
 class LimitedSet(object):
-    """Kind-of Set with limitations.
+    """Kind-of Set (or priority queue) with limitations.
 
     Good for when you need to test for membership (`a in set`),
     but the set should not grow unbounded.
 
-    :keyword maxlen: Maximum number of members before we start
-                     evicting expired members.
-    :keyword expires: Time in seconds, before a membership expires.
-
+    This version is now changed to be more enforcing those limits.
+    Maxlen is enforced all the time. But you can also configure
+    minlen now, which is minimal residual size of set.
+
+
+    Example::
+
+        >>> s = LimitedSet(maxlen=50000, expires=3600, minlen=4000)
+        >>> for i in range(60000):
+        ...     s.add(i)
+        ...     s.add(str(i))
+        ...
+        >>> 57000 in s  # last 50k inserted values are kept
+        True
+        >>> '10' in s  # '10' did expire and was purged from set.
+        False
+        >>> len(s)  # maxlen is reached
+        50000
+        >>> s.purge(now=time.time() + 7200)  # clock + 2 hours
+        >>> len(s)  # now only minlen items are cached
+        4000
+        >>>> 57000 in s  # even this item is gone now
+        False
     """
 
-    def __init__(self, maxlen=None, expires=None, data=None, heap=None):
-        # heap is ignored
+    REMOVED = object()  # just a placeholder for removed items
+    _MAX_HEAP_PERCENTS_OVERLOAD = 15  #
+
+    def __init__(self, maxlen=0, expires=0, minlen=0, data=None):
+        """Initialize LimitedSet.
+
+        All  arguments are optional, with exception of minlen, which must
+        be smaller than maxlen. Unconfigured limits will not be enforced.
+
+        :keyword maxlen: max size of this set. Adding more items than maxlen
+                         results in immediate removing of older items.
+        :keyword expires: TTL for an item.
+                          Items aging over expiration are purged.
+        :keyword minlen: minimal residual size of this set.
+                         Oldest expired items will be delete
+                         only until minlen size is reached.
+        :keyword data: data to initialize set with. Can be iterable of keys,
+                       dict {key:inserted_time} or another LimitedSet.
+
+        """
+        if maxlen is None:
+            maxlen = 0
+        if minlen is None:
+            minlen = 0
+        if expires is None:
+            expires = 0
         self.maxlen = maxlen
+        self.minlen = minlen
         self.expires = expires
-        self._data = {} if data is None else data
+        self._data = {}
         self._heap = []
-
         # make shortcuts
-        self.__len__ = self._heap.__len__
+        self.__len__ = self._data.__len__
         self.__contains__ = self._data.__contains__
 
-        self._refresh_heap()
+        if data:
+            # import items from data
+            self.update(data)
+
+        if not self.maxlen >= self.minlen >= 0:
+            raise ValueError('Minlen should be positive number, '
+                             'smaller or equal to maxlen.')
+        if self.expires < 0:
+            raise ValueError('Expires should not be negative!')
 
     def _refresh_heap(self):
-        self._heap[:] = [(t, key) for key, t in items(self._data)]
+        """Time consuming recreating of heap. Do not run this too often."""
+        self._heap[:] = [entry for entry in self._data.values()]
         heapify(self._heap)
 
-    def add(self, key, now=time.time, heappush=heappush):
-        """Add a new member."""
-        # offset is there to modify the length of the list,
-        # this way we can expire an item before inserting the value,
-        # and it will end up in the correct order.
-        self.purge(1, offset=1)
-        inserted = now()
-        self._data[key] = inserted
-        heappush(self._heap, (inserted, key))
-
     def clear(self):
-        """Remove all members"""
+        """Clear all data, start from scratch again."""
         self._data.clear()
         self._heap[:] = []
 
-    def discard(self, value):
-        """Remove membership by finding value."""
-        try:
-            itime = self._data[value]
-        except KeyError:
-            return
-        try:
-            self._heap.remove((itime, value))
-        except ValueError:
-            pass
-        self._data.pop(value, None)
-    pop_value = discard  # XXX compat
-
-    def purge(self, limit=None, offset=0, now=time.time):
-        """Purge expired items."""
-        H, maxlen = self._heap, self.maxlen
-        if not maxlen:
-            return
-
-        # If the data/heap gets corrupted and limit is None
-        # this will go into an infinite loop, so limit must
-        # have a value to guard the loop.
-        limit = len(self) + offset if limit is None else limit
-
-        i = 0
-        while len(self) + offset > maxlen:
-            if i >= limit:
-                break
-            try:
-                item = heappop(H)
-            except IndexError:
-                break
-            if self.expires:
-                if now() < item[0] + self.expires:
-                    heappush(H, item)
-                    break
-            try:
-                self._data.pop(item[1])
-            except KeyError:  # out of sync with heap
-                pass
-            i += 1
+    def add(self, item, now=None):
+        'Add a new item or update the time of an existing item'
+        if not now:
+            now = time.time()
+        if item in self._data:
+            self.discard(item)
+        entry = [now, item]
+        self._data[item] = entry
+        heappush(self._heap, entry)
+        if self.maxlen and len(self._data) >= self.maxlen:
+            self.purge()
 
     def update(self, other):
+        """Update this LimitedSet from other LimitedSet, dict or iterable."""
         if isinstance(other, LimitedSet):
             self._data.update(other._data)
             self._refresh_heap()
+            self.purge()
+        elif isinstance(other, dict):
+            # revokes are sent like dict!
+            for key, inserted in other.items():
+                if isinstance(inserted, list):
+                    # in case someone uses ._data directly for sending update
+                    inserted = inserted[0]
+                if not isinstance(inserted, float):
+                    raise ValueError('Expecting float timestamp, got type '
+                                     '"{0}" with value: {1}'.format(
+                                         type(inserted), inserted))
+                self.add(key, inserted)
         else:
+            # AVOID THIS, it could keep old data if more parties
+            # exchange them all over and over again
             for obj in other:
                 self.add(obj)
 
+    def discard(self, item):
+        'Mark an existing item as REMOVED. If KeyError is not found, pass.'
+        entry = self._data.pop(item, self.REMOVED)
+        if entry is self.REMOVED:
+            return
+        entry[-1] = self.REMOVED
+        if self._heap_overload > self._MAX_HEAP_PERCENTS_OVERLOAD:
+            self._refresh_heap()
+
+    pop_value = discard
+
+    def purge(self, now=None):
+        """Check oldest items and remove them if needed.
+
+        :keyword now: Time of purging -- by default right now.
+                      This can be usefull for unittesting.
+        """
+        if not now:
+            now = time.time()
+        if hasattr(now, '__call__'):
+            now = now()  # if we got this now as function, evaluate it
+        if self.maxlen:
+            while len(self._data) > self.maxlen:
+                self.pop()
+        # time based expiring:
+        if self.expires:
+            while len(self._data) > self.minlen >= 0:
+                inserted_time, _ = self._heap[0]
+                if inserted_time + self.expires > now:
+                    break  # end this right now, oldest item is not expired yet
+                self.pop()
+
+    def pop(self):
+        'Remove and return the lowest time item. Return None if empty.'
+        while self._heap:
+            _, item = heappop(self._heap)
+            if item is not self.REMOVED:
+                del self._data[item]
+                return item
+        return None
+
     def as_dict(self):
-        return self._data
+        """Whole set as serializable dictionary.
+        Example::
+
+            >>> s=LimitedSet(maxlen=200)
+            >>> r=LimitedSet(maxlen=200)
+            >>> for i in range(500):
+            ...     s.add(i)
+            ...
+            >>> r.update(s.as_dict())
+            >>> r == s
+            True
+        """
+        return {key: inserted for inserted, key in self._data.values()}
 
     def __eq__(self, other):
-        return self._heap == other._heap
+        return self._data == other._data
 
     def __ne__(self, other):
         return not self.__eq__(other)
 
     def __repr__(self):
-        return 'LimitedSet({0})'.format(len(self))
+        return 'LimitedSet(maxlen={0}, expires={1}, minlen={2})' \
+            ' Current size:{3}'.format(
+                self.maxlen, self.expires, self.minlen, len(self._data))
 
     def __iter__(self):
-        return (item[1] for item in self._heap)
+        # return (item[1] for item in
+        #         self._heap if item[-1] is not self.REMOVED)
+        # ^ not ordered, slow
+        return (i for _, i in sorted(self._data.values()))
 
     def __len__(self):
-        return len(self._heap)
+        return len(self._data)
 
     def __contains__(self, key):
         return key in self._data
 
     def __reduce__(self):
-        return self.__class__, (self.maxlen, self.expires, self._data)
+        """Pickle helper class.
+
+        This object can be pickled and upickled."""
+        return self.__class__, (
+            self.maxlen, self.expires, self.minlen, self.as_dict())
+
+    @property
+    def _heap_overload(self):
+        """Compute how much is heap bigger than data [percents]."""
+        if len(self._data) == 0:
+            return len(self._heap)
+        return len(self._heap)*100/len(self._data) - 100
+
 MutableSet.register(LimitedSet)

+ 78 - 25
celery/tests/utils/test_datastructures.py

@@ -188,45 +188,58 @@ class test_LimitedSet(Case):
         for n in 'bar', 'baz':
             self.assertIn(n, s)
         self.assertNotIn('foo', s)
+        s = LimitedSet(maxlen=10)
+        for i in range(150):
+            s.add(i)
+        self.assertLessEqual(len(s), 10)
+        # make sure heap is not leaking:
+        self.assertLessEqual(len(s._heap),
+                             len(s) * (100. +
+                             s._MAX_HEAP_PERCENTS_OVERLOAD) / 100)
 
     def test_purge(self):
-        s = LimitedSet(maxlen=None)
+        # purge now enforces rules
+        # cant purge(1) now. but .purge(now=...) still works
+        s = LimitedSet(maxlen=10)
         [s.add(i) for i in range(10)]
         s.maxlen = 2
-        s.purge(1)
-        self.assertEqual(len(s), 9)
-        s.purge(None)
+        s.purge()
         self.assertEqual(len(s), 2)
 
         # expired
-        s = LimitedSet(maxlen=None, expires=1)
+        s = LimitedSet(maxlen=10, expires=1)
         [s.add(i) for i in range(10)]
         s.maxlen = 2
-        s.purge(1, now=lambda: time() + 100)
-        self.assertEqual(len(s), 9)
-        s.purge(None, now=lambda: time() + 100)
-        self.assertEqual(len(s), 2)
+        s.purge(now=time() + 100)
+        self.assertEqual(len(s), 0)
 
         # not expired
         s = LimitedSet(maxlen=None, expires=1)
         [s.add(i) for i in range(10)]
         s.maxlen = 2
-        s.purge(1, now=lambda: time() - 100)
-        self.assertEqual(len(s), 10)
-        s.purge(None, now=lambda: time() - 100)
-        self.assertEqual(len(s), 10)
+        s.purge(now=lambda: time() - 100)
+        self.assertEqual(len(s), 2)
 
-        s = LimitedSet(maxlen=None)
-        [s.add(i) for i in range(10)]
-        s.maxlen = 2
-        with patch('celery.datastructures.heappop') as hp:
-            hp.side_effect = IndexError()
-            s.purge()
-            hp.assert_called_with(s._heap)
-        with patch('celery.datastructures.heappop') as hp:
-            s._data = {i * 2: i * 2 for i in range(10)}
-            s.purge()
-            self.assertEqual(hp.call_count, 10)
+        # expired -> minsize
+        s = LimitedSet(maxlen=10, minlen=10, expires=1)
+        [s.add(i) for i in range(20)]
+        s.minlen = 3
+        s.purge(now=time() + 3)
+        self.assertEqual(s.minlen, len(s))
+        self.assertLessEqual(len(s._heap),
+                             s.maxlen *
+                             (100. + s._MAX_HEAP_PERCENTS_OVERLOAD)/100)
+        # s = LimitedSet(maxlen=None)
+        # [s.add(i) for i in range(10)]
+        # s.maxlen = 2
+        # with patch('celery.datastructures.heappop') as hp:
+        #    hp.side_effect = IndexError()
+        #    s.purge()
+        #    hp.assert_called_with(s._heap)
+        # with patch('celery.datastructures.heappop') as hp:
+        #    s._data = {i * 2: i * 2 for i in range(10)}
+        #    s.purge()
+        #    self.assertEqual(hp.call_count, 10)
 
     def test_pickleable(self):
         s = LimitedSet(maxlen=2)
@@ -260,7 +273,7 @@ class test_LimitedSet(Case):
         s.discard('foo')
         self.assertNotIn('foo', s)
         self.assertEqual(len(s._data), 0)
-        self.assertEqual(len(s._heap), 0)
+        # self.assertLessEqual(len(s._heap), 0 + s.heap_overload)
         s.discard('foo')
 
     def test_clear(self):
@@ -285,6 +298,46 @@ class test_LimitedSet(Case):
 
         s2.update(['do', 're'])
         self.assertItemsEqual(list(s2), ['do', 're'])
+        s1 = LimitedSet(maxlen=10, expires=None)
+        s2 = LimitedSet(maxlen=10, expires=None)
+        s3 = LimitedSet(maxlen=10, expires=None)
+        s4 = LimitedSet(maxlen=10, expires=None)
+        s5 = LimitedSet(maxlen=10, expires=None)
+        for i in range(12):
+            s1.add(i)
+            s2.add(i*i)
+        s3.update(s1)
+        s3.update(s2)
+        s4.update(s1.as_dict())
+        s4.update(s2.as_dict())
+        s5.update(s1._data)  # revoke is using this
+        s5.update(s2._data)  #
+        self.assertEqual(s3, s4)
+        self.assertEqual(s3, s5)
+        s2.update(s4)
+        s4.update(s2)
+        self.assertEqual(s2, s4)
+
+    def test_iterable_and_ordering(self):
+        s = LimitedSet(maxlen=35, expires=None)
+        for i in reversed(range(15)):
+            s.add(i)
+        j = 40
+        for i in s:
+            self.assertLess(i, j)  # each item is smaller and smaller
+            j = i
+        self.assertEqual(i, 0)  # last item = 0
+
+    def test_pop_and_ordering_again(self):
+        s = LimitedSet(maxlen=5)
+        for i in range(10):
+            s.add(i)
+        j = -1
+        for _ in range(5):
+            i = s.pop()
+            self.assertLess(j, i)
+        i = s.pop()
+        self.assertEqual(i, None)
 
     def test_as_dict(self):
         s = LimitedSet(maxlen=2)