Ask Solem il y a 9 ans
Parent
commit
7c4c6eb22e
2 fichiers modifiés avec 105 ajouts et 107 suppressions
  1. 91 87
      celery/datastructures.py
  2. 14 20
      celery/tests/utils/test_datastructures.py

+ 91 - 87
celery/datastructures.py

@@ -11,7 +11,9 @@ from __future__ import absolute_import, print_function, unicode_literals
 import sys
 import time
 
-from collections import defaultdict, Mapping, MutableMapping, MutableSet
+from collections import (
+    Callable, Mapping, MutableMapping, MutableSet, defaultdict,
+)
 from heapq import heapify, heappush, heappop
 from itertools import chain
 
@@ -19,7 +21,7 @@ from billiard.einfo import ExceptionInfo  # noqa
 from kombu.utils.encoding import safe_str, bytes_to_str
 from kombu.utils.limits import TokenBucket  # noqa
 
-from celery.five import items
+from celery.five import items, values
 from celery.utils.functional import LRUCache, first, uniq  # noqa
 from celery.utils.text import match_case
 
@@ -30,6 +32,10 @@ except ImportError:
         pass
     LazySettings = LazyObject  # noqa
 
+__all__ = ['GraphFormatter', 'CycleError', 'DependencyGraph',
+           'AttributeDictMixin', 'AttributeDict', 'DictAttribute',
+           'ConfigurationView', 'LimitedSet']
+
 DOT_HEAD = """
 {IN}{type} {id} {{
 {INp}graph [{attrs}]
@@ -41,9 +47,11 @@ DOT_ATTRSEP = ', '
 DOT_DIRS = {'graph': '--', 'digraph': '->'}
 DOT_TAIL = '{IN}}}'
 
-__all__ = ['GraphFormatter', 'CycleError', 'DependencyGraph',
-           'AttributeDictMixin', 'AttributeDict', 'DictAttribute',
-           'ConfigurationView', 'LimitedSet']
+REPR_LIMITED_SET = """\
+<{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
+"""
+
+sentinel = object()
 
 
 def force_mapping(m):
@@ -578,7 +586,6 @@ class ConfigurationView(AttributeDictMixin):
 
         def values(self):
             return list(self._iterate_values())
-
 MutableMapping.register(ConfigurationView)
 
 
@@ -588,10 +595,34 @@ class LimitedSet(object):
     Good for when you need to test for membership (`a in set`),
     but the set should not grow unbounded.
 
-    This version is now changed to be more enforcing those limits.
-    Maxlen is enforced all the time. But you can also configure
-    minlen now, which is minimal residual size of set.
+    Maxlen is enforced at all times, so if the limit is reached
+    we will also remove non-expired items.
+
+    You can also configure minlen, which is the minimal residual size
+    of the set.
+
+    All  arguments are optional, with exception of minlen, which must
+    be smaller than maxlen.  Unconfigured limits will not be enforced.
+
+    :keyword maxlen: Optional max number of items.
+
+        Adding more items than maxlen will result in immediate
+        removal of items sorted by oldest insertion time.
+
+    :keyword expires: TTL for all items.
+
+        Items aging over expiration are purged as keys are inserted.
 
+    :keyword minlen: Minimal residual size of this set.
+        .. versionadded:: 4.0
+
+        Older expired items will be deleted, only after the set
+        exceeds minlen number of items.
+
+    :keyword data: Initial data to initialize set with.
+        Can be an iterable of ``(key, value)`` pairs,
+        a dict (``{key: insertion_time}``), or another instance
+        of :class:`LimitedSet`.
 
     Example::
 
@@ -611,39 +642,18 @@ class LimitedSet(object):
         4000
         >>>> 57000 in s  # even this item is gone now
         False
-    """
-
-    REMOVED = object()  # just a placeholder for removed items
-    _MAX_HEAP_PERCENTS_OVERLOAD = 15  #
 
-    def __init__(self, maxlen=0, expires=0, minlen=0, data=None):
-        """Initialize LimitedSet.
-
-        All  arguments are optional, with exception of minlen, which must
-        be smaller than maxlen. Unconfigured limits will not be enforced.
+    """
 
-        :keyword maxlen: max size of this set. Adding more items than maxlen
-                         results in immediate removing of older items.
-        :keyword expires: TTL for an item.
-                          Items aging over expiration are purged.
-        :keyword minlen: minimal residual size of this set.
-                         Oldest expired items will be delete
-                         only until minlen size is reached.
-        :keyword data: data to initialize set with. Can be iterable of keys,
-                       dict {key:inserted_time} or another LimitedSet.
+    max_heap_percent_overload = 15
 
-        """
-        if maxlen is None:
-            maxlen = 0
-        if minlen is None:
-            minlen = 0
-        if expires is None:
-            expires = 0
-        self.maxlen = maxlen
-        self.minlen = minlen
-        self.expires = expires
+    def __init__(self, maxlen=0, expires=0, data=None, minlen=0):
+        self.maxlen = 0 if maxlen is None else maxlen
+        self.minlen = 0 if minlen is None else minlen
+        self.expires = 0 if expires is None else expires
         self._data = {}
         self._heap = []
+
         # make shortcuts
         self.__len__ = self._data.__len__
         self.__contains__ = self._data.__contains__
@@ -653,14 +663,17 @@ class LimitedSet(object):
             self.update(data)
 
         if not self.maxlen >= self.minlen >= 0:
-            raise ValueError('Minlen should be positive number, '
-                             'smaller or equal to maxlen.')
+            raise ValueError(
+                'minlen must be a positive number, less or equal to maxlen.')
         if self.expires < 0:
-            raise ValueError('Expires should not be negative!')
+            raise ValueError('expires cannot be negative!')
 
     def _refresh_heap(self):
         """Time consuming recreating of heap. Do not run this too often."""
-        self._heap[:] = [entry for entry in self._data.values()]
+        self._heap[:] = [
+            entry for entry in values(self._data)
+            if entry is not sentinel
+        ]
         heapify(self._heap)
 
     def clear(self):
@@ -669,12 +682,11 @@ class LimitedSet(object):
         self._heap[:] = []
 
     def add(self, item, now=None):
-        'Add a new item or update the time of an existing item'
-        if not now:
-            now = time.time()
+        """Add a new item, or reset the expiry time of an existing item."""
+        now = now or time.time()
         if item in self._data:
             self.discard(item)
-        entry = [now, item]
+        entry = (now, item)
         self._data[item] = entry
         heappush(self._heap, entry)
         if self.maxlen and len(self._data) >= self.maxlen:
@@ -687,43 +699,41 @@ class LimitedSet(object):
             self._refresh_heap()
             self.purge()
         elif isinstance(other, dict):
-            # revokes are sent like dict!
-            for key, inserted in other.items():
+            # revokes are sent as a dict
+            for key, inserted in items(other):
                 if isinstance(inserted, list):
                     # in case someone uses ._data directly for sending update
                     inserted = inserted[0]
                 if not isinstance(inserted, float):
-                    raise ValueError('Expecting float timestamp, got type '
-                                     '"{0}" with value: {1}'.format(
-                                         type(inserted), inserted))
+                    raise ValueError(
+                        'Expecting float timestamp, got type '
+                        '{0!r} with value: {1}'.format(
+                            type(inserted), inserted))
                 self.add(key, inserted)
         else:
-            # AVOID THIS, it could keep old data if more parties
+            # XXX AVOID THIS, it could keep old data if more parties
             # exchange them all over and over again
             for obj in other:
                 self.add(obj)
 
     def discard(self, item):
-        'Mark an existing item as REMOVED. If KeyError is not found, pass.'
-        entry = self._data.pop(item, self.REMOVED)
-        if entry is self.REMOVED:
-            return
-        entry[-1] = self.REMOVED
-        if self._heap_overload > self._MAX_HEAP_PERCENTS_OVERLOAD:
-            self._refresh_heap()
-
+        # mark an existing item as removed. If KeyError is not found, pass.
+        entry = self._data.pop(item, sentinel)
+        if entry is not sentinel:
+            entry[-1] = sentinel
+            if self._heap_overload > self.max_heap_percent_overload:
+                self._refresh_heap()
     pop_value = discard
 
     def purge(self, now=None):
         """Check oldest items and remove them if needed.
 
         :keyword now: Time of purging -- by default right now.
-                      This can be usefull for unittesting.
+                      This can be useful for unit testing.
+
         """
-        if not now:
-            now = time.time()
-        if hasattr(now, '__call__'):
-            now = now()  # if we got this now as function, evaluate it
+        now = now or time.time()
+        now = now() if isinstance(now, Callable) else now
         if self.maxlen:
             while len(self._data) > self.maxlen:
                 self.pop()
@@ -732,32 +742,33 @@ class LimitedSet(object):
             while len(self._data) > self.minlen >= 0:
                 inserted_time, _ = self._heap[0]
                 if inserted_time + self.expires > now:
-                    break  # end this right now, oldest item is not expired yet
+                    break  # oldest item has not expired yet
                 self.pop()
 
-    def pop(self):
-        'Remove and return the lowest time item. Return None if empty.'
+    def pop(self, default=None):
+        """Remove and return the oldest item, or :const:`None` when empty."""
         while self._heap:
             _, item = heappop(self._heap)
-            if item is not self.REMOVED:
-                del self._data[item]
+            if self._data.pop(item, None) is not sentinel:
                 return item
-        return None
+        return default
 
     def as_dict(self):
         """Whole set as serializable dictionary.
+
         Example::
 
-            >>> s=LimitedSet(maxlen=200)
-            >>> r=LimitedSet(maxlen=200)
+            >>> s = LimitedSet(maxlen=200)
+            >>> r = LimitedSet(maxlen=200)
             >>> for i in range(500):
             ...     s.add(i)
             ...
             >>> r.update(s.as_dict())
             >>> r == s
             True
+
         """
-        return {key: inserted for inserted, key in self._data.values()}
+        return {key: inserted for inserted, key in values(self._data)}
 
     def __eq__(self, other):
         return self._data == other._data
@@ -766,15 +777,12 @@ class LimitedSet(object):
         return not self.__eq__(other)
 
     def __repr__(self):
-        return 'LimitedSet(maxlen={0}, expires={1}, minlen={2})' \
-            ' Current size:{3}'.format(
-                self.maxlen, self.expires, self.minlen, len(self._data))
+        return REPR_LIMITED_SET.format(
+            self, name=type(self).__name__, size=len(self),
+        )
 
     def __iter__(self):
-        # return (item[1] for item in
-        #         self._heap if item[-1] is not self.REMOVED)
-        # ^ not ordered, slow
-        return (i for _, i in sorted(self._data.values()))
+        return (i for _, i in sorted(values(self._data)))
 
     def __len__(self):
         return len(self._data)
@@ -783,17 +791,13 @@ class LimitedSet(object):
         return key in self._data
 
     def __reduce__(self):
-        """Pickle helper class.
-
-        This object can be pickled and upickled."""
         return self.__class__, (
-            self.maxlen, self.expires, self.minlen, self.as_dict())
+            self.maxlen, self.expires, self.as_dict(), self.minlen)
 
     @property
     def _heap_overload(self):
         """Compute how much is heap bigger than data [percents]."""
-        if len(self._data) == 0:
+        if not self._data:
             return len(self._heap)
-        return len(self._heap)*100/len(self._data) - 100
-
+        return len(self._heap) * 100 / len(self._data) - 100
 MutableSet.register(LimitedSet)

+ 14 - 20
celery/tests/utils/test_datastructures.py

@@ -3,6 +3,8 @@ from __future__ import absolute_import
 import pickle
 import sys
 
+from collections import Mapping
+
 from billiard.einfo import ExceptionInfo
 from time import time
 
@@ -188,14 +190,17 @@ class test_LimitedSet(Case):
         for n in 'bar', 'baz':
             self.assertIn(n, s)
         self.assertNotIn('foo', s)
+
         s = LimitedSet(maxlen=10)
         for i in range(150):
             s.add(i)
         self.assertLessEqual(len(s), 10)
+
         # make sure heap is not leaking:
-        self.assertLessEqual(len(s._heap),
-                             len(s) * (100. +
-                             s._MAX_HEAP_PERCENTS_OVERLOAD) / 100)
+        self.assertLessEqual(
+            len(s._heap),
+            len(s) * (100. + s.max_heap_percent_overload) / 100,
+        )
 
     def test_purge(self):
         # purge now enforces rules
@@ -226,20 +231,10 @@ class test_LimitedSet(Case):
         s.minlen = 3
         s.purge(now=time() + 3)
         self.assertEqual(s.minlen, len(s))
-        self.assertLessEqual(len(s._heap),
-                             s.maxlen *
-                             (100. + s._MAX_HEAP_PERCENTS_OVERLOAD)/100)
-        # s = LimitedSet(maxlen=None)
-        # [s.add(i) for i in range(10)]
-        # s.maxlen = 2
-        # with patch('celery.datastructures.heappop') as hp:
-        #    hp.side_effect = IndexError()
-        #    s.purge()
-        #    hp.assert_called_with(s._heap)
-        # with patch('celery.datastructures.heappop') as hp:
-        #    s._data = {i * 2: i * 2 for i in range(10)}
-        #    s.purge()
-        #    self.assertEqual(hp.call_count, 10)
+        self.assertLessEqual(
+            len(s._heap),
+            s.maxlen * (100. + s._MAX_HEAP_PERCENTS_OVERLOAD) / 100,
+        )
 
     def test_pickleable(self):
         s = LimitedSet(maxlen=2)
@@ -273,7 +268,6 @@ class test_LimitedSet(Case):
         s.discard('foo')
         self.assertNotIn('foo', s)
         self.assertEqual(len(s._data), 0)
-        # self.assertLessEqual(len(s._heap), 0 + s.heap_overload)
         s.discard('foo')
 
     def test_clear(self):
@@ -311,7 +305,7 @@ class test_LimitedSet(Case):
         s4.update(s1.as_dict())
         s4.update(s2.as_dict())
         s5.update(s1._data)  # revoke is using this
-        s5.update(s2._data)  #
+        s5.update(s2._data)
         self.assertEqual(s3, s4)
         self.assertEqual(s3, s5)
         s2.update(s4)
@@ -342,7 +336,7 @@ class test_LimitedSet(Case):
     def test_as_dict(self):
         s = LimitedSet(maxlen=2)
         s.add('foo')
-        self.assertIsInstance(s.as_dict(), dict)
+        self.assertIsInstance(s.as_dict(), Mapping)
 
 
 class test_AttributeDict(Case):