Explorar o código

Introduced celery.datastructures.LocalCache, a dictionary with a limit on the number of keys, perfect for caches where keys are unique (i.e. task result caches)

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
616b9c6461
Modificáronse 2 ficheiros con 52 adicións e 1 borrados
  1. 32 0
      celery/datastructures.py
  2. 20 1
      celery/tests/test_datastructures.py

+ 32 - 0
celery/datastructures.py

@@ -4,6 +4,7 @@ Custom Datastructures
 
 """
 import time
+import heapq
 import traceback
 from UserList import UserList
 from Queue import Queue, Empty as QueueEmpty
@@ -223,3 +224,34 @@ class LimitedSet(object):
     def first(self):
         """Get the oldest member."""
         return self.chronologically[0]
+
+
+class LocalCache(dict):
+    """Dictionary with a finite number of keys.
+
+    Older keys are expired first, but note that the timestamp
+    is not updated if a key is inserted twice, so it's not that great
+    for anything but unique keys.
+
+    """
+
+    def __init__(self, limit=None, initial=None):
+        super(LocalCache, self).__init__()
+        self.limit = limit
+        self.timestamps = []
+        initial = initial or {}
+        for key, value in initial.items():
+            self[key] = value
+
+    def __setitem__(self, key, value):
+        baseproxy = super(LocalCache, self)
+        item = time.time(), key
+        if len(self) >= self.limit:
+            timestamp, expired_key = heapq.heapreplace(self.timestamps, item)
+            baseproxy.pop(expired_key, None)
+        else:
+            heapq.heappush(self.timestamps, item)
+        baseproxy.__setitem__(key, value)
+
+    def __delitem__(self, key):
+        raise NotImplementedError("LocalCache doesn't support deletion.")

+ 20 - 1
celery/tests/test_datastructures.py

@@ -2,7 +2,7 @@ import sys
 import unittest
 from Queue import Queue
 
-from celery.datastructures import PositionQueue, ExceptionInfo
+from celery.datastructures import PositionQueue, ExceptionInfo, LocalCache
 from celery.datastructures import LimitedSet, SharedCounter, consume_queue
 
 
@@ -127,3 +127,22 @@ class TestLimitedSet(unittest.TestCase):
         items = "foo", "bar"
         map(s.add, items)
         self.assertTrue(repr(s).startswith("LimitedSet("))
+
+
+class TestLocalCache(unittest.TestCase):
+
+    def test_expires(self):
+        limit = 100
+        x = LocalCache(limit=limit)
+        slots = list(range(limit * 2))
+        for i in slots:
+            x[i] = i
+        self.assertEquals(x.keys(), slots[limit:])
+        self.assertEquals(len(x.keys()), len(x.timestamps),
+                "timestamps deleted, does not grow.")
+
+    def test_delete_not_implemented(self):
+        self.assertRaises(NotImplementedError, LocalCache(10).__delitem__,
+                "foo")
+
+