Переглянути джерело

Fixes the OrderedDict KeyError by avoiding conccurent access.

Closes #439.
rnoel 13 роки тому
батько
коміт
dcbdd11064
2 змінених файлів з 21 додано та 5 видалено
  1. 10 2
      celery/backends/base.py
  2. 11 3
      celery/datastructures.py

+ 10 - 2
celery/backends/base.py

@@ -219,7 +219,11 @@ class BaseDictBackend(BaseBackend):
 
     def get_task_meta(self, task_id, cache=True):
         if cache and task_id in self._cache:
-            return self._cache[task_id]
+            try:
+              return self._cache[task_id]
+            except KeyError:
+              #The Backend has been emptied in the meantime
+              pass
 
         meta = self._get_task_meta_for(task_id)
         if cache and meta.get("status") == states.SUCCESS:
@@ -235,7 +239,11 @@ class BaseDictBackend(BaseBackend):
 
     def get_taskset_meta(self, taskset_id, cache=True):
         if cache and taskset_id in self._cache:
-            return self._cache[taskset_id]
+            try:
+              return self._cache[taskset_id]
+            except KeyError:
+              #The Backend has been emptied in the meantime
+              pass
 
         meta = self._restore_taskset(taskset_id)
         if cache and meta is not None:

+ 11 - 3
celery/datastructures.py

@@ -9,12 +9,14 @@ Custom data structures.
 
 """
 from __future__ import absolute_import
+from __future__ import with_statement
 
 import time
 import traceback
 
 from itertools import chain
 from Queue import Empty
+from threading import RLock
 
 from celery.utils.compat import OrderedDict
 
@@ -299,11 +301,17 @@ class LocalCache(OrderedDict):
     def __init__(self, limit=None):
         super(LocalCache, self).__init__()
         self.limit = limit
+        self.lock = RLock()
 
     def __setitem__(self, key, value):
-        while len(self) >= self.limit:
-            self.popitem(last=False)
-        super(LocalCache, self).__setitem__(key, value)
+        with self.lock:
+            while len(self) >= self.limit:
+                self.popitem(last=False)
+            super(LocalCache, self).__setitem__(key, value)
+
+    def pop(self, key, *args):
+        with self.lock:
+            self.pop(key, *args)
 
 
 class TokenBucket(object):