Przeglądaj źródła

Utils: Buffer: Changes API from append/pop -> put/take

This avoids colliding with the regular dict interface.
Ask Solem 8 lat temu
rodzic
commit
5f4963435e

+ 2 - 2
celery/backends/async.py

@@ -138,7 +138,7 @@ class AsyncBackendMixin(object):
         return result
 
     def _maybe_resolve_from_buffer(self, result):
-        result._maybe_set_cache(self._pending_messages.pop(result.id))
+        result._maybe_set_cache(self._pending_messages.take(result.id))
 
     def _add_pending_result(self, task_id, result, weak=False):
         weak, concrete = self._pending_results
@@ -263,7 +263,7 @@ class BaseResultConsumer(object):
             except KeyError:
                 # send to buffer in case we received this result
                 # before it was added to _pending_results.
-                self._pending_messages.append(task_id, meta)
+                self._pending_messages.put(task_id, meta)
             else:
                 result._maybe_set_cache(meta)
                 buckets = self.buckets

+ 25 - 25
celery/datastructures.py

@@ -975,7 +975,7 @@ class Messagebuffer(Evictable):
         self._len = self.data.__len__
         self._extend = self.data.extend
 
-    def append(self, item):
+    def put(self, item):
         self._append(item)
         self.maxsize and self._evict()
 
@@ -983,7 +983,7 @@ class Messagebuffer(Evictable):
         self._extend(it)
         self.maxsize and self._evict()
 
-    def pop(self, *default):
+    def take(self, *default):
         try:
             return self._pop()
         except IndexError:
@@ -992,7 +992,7 @@ class Messagebuffer(Evictable):
             raise self.Empty()
 
     def _pop_to_evict(self):
-        return self.pop()
+        return self.take()
 
     def __repr__(self):
         return '<{0}: {1}/{2}>'.format(
@@ -1042,7 +1042,18 @@ class BufferMap(OrderedDict, Evictable):
             self.update(iterable)
         self.total = sum(len(buf) for buf in items(self))
 
-    def pop(self, key, *default):
+    def put(self, key, item):
+        self._get_or_create_buffer(key).put(item)
+        self.total += 1
+        self.move_to_end(key)   # least recently used.
+        self.maxsize and self._evict()
+
+    def extend(self, key, it):
+        self._get_or_create_buffer(key).extend(it)
+        self.total += len(it)
+        self.maxsize and self._evict()
+
+    def take(self, key, *default):
         item, throw = None, False
         try:
             buf = self[key]
@@ -1050,12 +1061,12 @@ class BufferMap(OrderedDict, Evictable):
             throw = True
         else:
             try:
-                item = buf.pop()
+                item = buf.take()
                 self.total -= 1
             except self.Empty:
                 throw = True
             else:
-                self.move_to_end(key)  # least recently used.
+                self.move_to_end(key)  # mark as LRU
 
         if throw:
             if default:
@@ -1063,50 +1074,39 @@ class BufferMap(OrderedDict, Evictable):
             raise self.Empty()
         return item
 
-    def get_or_create(self, key):
+    def _get_or_create_buffer(self, key):
         try:
             return self[key]
         except KeyError:
-            buf = self[key] = self.Buffer(maxsize=self.bufmaxsize)
+            buf = self[key] = self._new_buffer()
             return buf
 
-    def discard(self, key, *default):
-        super(BufferMap, self).pop(key, *default)
+    def _new_buffer(self):
+        return self.Buffer(maxsize=self.bufmaxsize)
 
     def _LRUpop(self, *default):
-        return self[self._LRUkey()].pop(*default)
+        return self[self._LRUkey()].take(*default)
 
     def _pop_to_evict(self):
         for i in range(100):
             key = self._LRUkey()
             buf = self[key]
             try:
-                buf.pop()
+                buf.take()
             except (IndexError, self.Empty):
                 # buffer empty, remove it from mapping.
-                self.discard(key)
+                self.pop(key)
             else:
                 # we removed one item
                 self.total -= 1
                 # if buffer is empty now, remove it from mapping.
                 if not len(buf):
-                    self.discard(key)
+                    self.pop(key)
                 else:
                     # move to least recently used.
                     self.move_to_end(key)
                 break
 
-    def append(self, key, item):
-        self.get_or_create(key).append(item)
-        self.total += 1
-        self.move_to_end(key)   # least recently used.
-        self.maxsize and self._evict()
-
-    def extend(self, key, it):
-        self.get_or_create(key).extend(it)
-        self.total += len(it)
-        self.maxsize and self._evict()
-
     def __repr__(self):
         return '<{0}: {1}/{2}>'.format(
             type(self).__name__, self.total, self.maxsize,

+ 9 - 9
celery/tests/utils/test_datastructures.py

@@ -427,18 +427,18 @@ class test_Messagebuffer(Case):
 
     def assert_size_and_first(self, buf, size, expected_first_item):
         self.assertEqual(len(buf), size)
-        self.assertEqual(buf.pop(), expected_first_item)
+        self.assertEqual(buf.take(), expected_first_item)
 
     def test_append_limited(self):
         b = Messagebuffer(10)
         for i in range(20):
-            b.append(i)
+            b.put(i)
         self.assert_size_and_first(b, 10, 10)
 
     def test_append_unlimited(self):
         b = Messagebuffer(None)
         for i in range(20):
-            b.append(i)
+            b.put(i)
         self.assert_size_and_first(b, 20, 0)
 
     def test_extend_limited(self):
@@ -461,12 +461,12 @@ class test_Messagebuffer(Case):
     def test_pop_empty_with_default(self):
         b = Messagebuffer(10)
         sentinel = object()
-        self.assertIs(b.pop(sentinel), sentinel)
+        self.assertIs(b.take(sentinel), sentinel)
 
     def test_pop_empty_no_default(self):
         b = Messagebuffer(10)
         with self.assertRaises(b.Empty):
-            b.pop()
+            b.take()
 
     def test_repr(self):
         self.assertTrue(repr(Messagebuffer(10, [1, 2, 3])))
@@ -499,7 +499,7 @@ class test_BufferMap(Case):
     def test_append_limited(self):
         b = BufferMap(10)
         for i in range(20):
-            b.append(i, i)
+            b.put(i, i)
         self.assert_size_and_first(b, 10, 10)
 
     def assert_size_and_first(self, buf, size, expected_first_item):
@@ -509,7 +509,7 @@ class test_BufferMap(Case):
     def test_append_unlimited(self):
         b = BufferMap(None)
         for i in range(20):
-            b.append(i, i)
+            b.put(i, i)
         self.assert_size_and_first(b, 20, 0)
 
     def test_extend_limited(self):
@@ -525,12 +525,12 @@ class test_BufferMap(Case):
     def test_pop_empty_with_default(self):
         b = BufferMap(10)
         sentinel = object()
-        self.assertIs(b.pop(1, sentinel), sentinel)
+        self.assertIs(b.take(1, sentinel), sentinel)
 
     def test_pop_empty_no_default(self):
         b = BufferMap(10)
         with self.assertRaises(b.Empty):
-            b.pop(1)
+            b.take(1)
 
     def test_repr(self):
         self.assertTrue(repr(Messagebuffer(10, [1, 2, 3])))