Ver código fonte

Corrects other thread-safety issues.

rnoel 13 anos atrás
pai
commit
18aa7b7fce
2 arquivos alterados com 18 adições e 2 exclusões
  1. 10 2
      celery/backends/base.py
  2. 8 0
      celery/datastructures.py

+ 10 - 2
celery/backends/base.py

@@ -219,7 +219,11 @@ class BaseDictBackend(BaseBackend):
 
     def get_task_meta(self, task_id, cache=True):
         if cache and task_id in self._cache:
-            return self._cache[task_id]
+            try:
+              return self._cache[task_id]
+            except KeyError:
+              #The Backend has been emptied in the meantime
+              pass
 
         meta = self._get_task_meta_for(task_id)
         if cache and meta.get("status") == states.SUCCESS:
@@ -235,7 +239,11 @@ class BaseDictBackend(BaseBackend):
 
     def get_taskset_meta(self, taskset_id, cache=True):
         if cache and taskset_id in self._cache:
-            return self._cache[taskset_id]
+            try:
+              return self._cache[taskset_id]
+            except KeyError:
+              #The Backend has been emptied in the meantime
+              pass
 
         meta = self._restore_taskset(taskset_id)
         if cache and meta is not None:

+ 8 - 0
celery/datastructures.py

@@ -311,6 +311,14 @@ class LocalCache(OrderedDict):
         finally:
           self.lock.release()
 
+    def pop(self, key, *args):
+        self.lock.acquire()   
+        try:
+            self.pop(key, *args)
+        finally:
+          self.lock.release()
+
+
 
 class TokenBucket(object):
     """Token Bucket Algorithm.