فهرست منبع

Cache/Tyrant backend now shares the same inheritage: KeyValueStoreBackend

Ask Solem 16 سال پیش
والد
کامیت
8c2c3e842b
3فایلهای تغییر یافته به همراه70 افزوده شده و 106 حذف شده
  1. 56 0
      celery/backends/base.py
  2. 6 50
      celery/backends/cache.py
  3. 8 56
      celery/backends/tyrant.py

+ 56 - 0
celery/backends/base.py

@@ -197,3 +197,59 @@ class BaseBackend(object):
 
         """
         pass
+
+
+class KeyValueStoreBackend(BaseBackend):
+
+    capabilities = ["ResultStore"]
+
+    def __init__(self, *args, **kwargs):
+        super(KeyValueStoreBackend, self).__init__()
+        self._cache = {}
+
+    def get_cache_key_for_task(self, task_id):
+        """Get the cache key for a task by id."""
+        return "celery-task-meta-%s" % task_id
+
+    def get(self, key):
+        raise NotImplementedError("Must implement the get method.")
+    
+    def set(self, key, value):
+        raise NotImplementedError("Must implement the set method.")
+    
+    def store_result(self, task_id, result, status):
+        """Store task result and status."""
+        if status == "DONE":
+            result = self.prepare_result(result)
+        elif status == "FAILURE":
+            result = self.prepare_exception(result)
+        meta = {"status": status, "result": result}
+        self.set(self.get_cache_key_for_task(task_id), pickle.dumps(meta))
+
+    def get_status(self, task_id):
+        """Get the status of a task."""
+        return self._get_task_meta_for(task_id)["status"]
+    
+    def get_result(self, task_id):
+        """Get the result of a task."""
+        meta = self._get_task_meta_for(task_id)
+        if meta["status"] == "FAILURE":
+            return self.exception_to_python(meta["result"])
+        else:
+            return meta["result"]
+    
+    def is_done(self, task_id):
+        """Returns ``True`` if the task executed successfully."""
+        return self.get_status(task_id) == "DONE"
+
+    def _get_task_meta_for(self, task_id):
+        """Get task metadata for a task by id."""
+        if task_id in self._cache:
+            return self._cache[task_id]
+        meta = self.get(self.get_cache_key_for_task(task_id))
+        if not meta:
+            return {"status": "PENDING", "result": None}
+        meta = pickle.loads(meta)
+        if meta.get("status") == "DONE":
+            self._cache[task_id] = meta
+        return meta

+ 6 - 50
celery/backends/cache.py

@@ -1,58 +1,14 @@
 """celery.backends.cache"""
 from django.core.cache import cache
-from celery.backends.base import BaseBackend
-try:
-    import cPickle as pickle
-except ImportError:
-    import pickle
+from celery.backends.base import KeyValueStoreBackend
 
 
-class Backend(BaseBackend):
+class Backend(KeyValueStoreBackend):
     """Backend using the Django cache framework to store task metadata."""
 
-    capabilities = ["ResultStore"]
+    def get(self, key):
+        return cache.get(key)
 
-    def __init__(self, *args, **kwargs):
-        super(Backend, self).__init__(*args, **kwargs)
-        self._cache = {}
+    def set(self, key, value):
+        cache.set(key, value)
 
-    def _cache_key(self, task_id):
-        """Get the cache key for a task by id."""
-        return "celery-task-meta-%s" % task_id
-
-    def store_result(self, task_id, result, status):
-        """Store task result and status."""
-        if status == "DONE":
-            result = self.prepare_result(result)
-        elif status == "FAILURE":
-            result = self.prepare_exception(result)
-        meta = {"status": status, "result": pickle.dumps(result)}
-        cache.set(self._cache_key(task_id), meta)
-
-    def get_status(self, task_id):
-        """Get the status of a task."""
-        return self._get_task_meta_for(task_id)["status"]
-
-    def get_result(self, task_id):
-        """Get the result of a task."""
-        meta = self._get_task_meta_for(task_id)
-        if meta["status"] == "FAILURE":
-            return self.exception_to_python(meta["result"])
-        else:
-            return meta["result"]
-
-    def is_done(self, task_id):
-        """Returns ``True`` if the task has been executed successfully."""
-        return self.get_status(task_id) == "DONE"
-
-    def _get_task_meta_for(self, task_id):
-        """Get the task metadata for a task by id."""
-        if task_id in self._cache:
-            return self._cache[task_id]
-        meta = cache.get(self._cache_key(task_id))
-        if not meta:
-            return {"status": "PENDING", "result": None}
-        meta["result"] = pickle.loads(meta.get("result", None))
-        if meta.get("status") == "DONE":
-            self._cache[task_id] = meta
-        return meta

+ 8 - 56
celery/backends/tyrant.py

@@ -7,19 +7,11 @@ except ImportError:
     raise ImproperlyConfigured(
             "The Tokyo Tyrant backend requires the pytyrant library.")
 
-from celery.backends.base import BaseBackend
+from celery.backends.base import KeyValueStoreBackend
 from django.conf import settings
-try:
-    import simplejson as json
-except ImportError:
-    from django.utils import simplejson as json
-try:
-    import cPickle as pickle
-except ImportError:
-    import pickle
 
 
-class Backend(BaseBackend):
+class Backend(KeyValueStoreBackend):
     """Tokyo Cabinet based task backend store.
 
     .. attribute:: tyrant_host
@@ -34,8 +26,6 @@ class Backend(BaseBackend):
     tyrant_host = None
     tyrant_port = None
 
-    capabilities = ["ResultStore"]
-
     def __init__(self, tyrant_host=None, tyrant_port=None):
         """Initialize Tokyo Tyrant backend instance.
 
@@ -53,7 +43,6 @@ class Backend(BaseBackend):
                 "To use the Tokyo Tyrant backend, you have to "
                 "set the TT_HOST and TT_PORT settings in your settings.py")
         super(Backend, self).__init__()
-        self._cache = {}
         self._connection = None
 
     def open(self):
@@ -64,7 +53,6 @@ class Backend(BaseBackend):
         explicit :meth:`close`.
         
         """
-
         # connection overrides bool()
         if self._connection is None:
             self._connection = pytyrant.PyTyrant.open(self.tyrant_host,
@@ -73,7 +61,6 @@ class Backend(BaseBackend):
 
     def close(self):
         """Close the tyrant connection and remove the cache."""
-
         # connection overrides bool()
         if self._connection is not None:
             self._connection.close()
@@ -82,44 +69,9 @@ class Backend(BaseBackend):
     def process_cleanup(self):
         self.close()
 
-    def _cache_key(self, task_id):
-        """Get the cache key for a task by id."""
-        return "celery-task-meta-%s" % task_id
-
-    def store_result(self, task_id, result, status):
-        """Store task result and status."""
-        if status == "DONE":
-            result = self.prepare_result(result)
-        elif status == "FAILURE":
-            result = self.prepare_exception(result)
-        meta = {"status": status, "result": pickle.dumps(result)}
-        self.open()[self._cache_key(task_id)] = json.dumps(meta)
-
-    def get_status(self, task_id):
-        """Get the status for a task."""
-        return self._get_task_meta_for(task_id)["status"]
-
-    def get_result(self, task_id):
-        """Get the result of a task."""
-        meta = self._get_task_meta_for(task_id)
-        if meta["status"] == "FAILURE":
-            return self.exception_to_python(meta["result"])
-        else:
-            return meta["result"]
-
-    def is_done(self, task_id):
-        """Returns ``True`` if the task executed successfully."""
-        return self.get_status(task_id) == "DONE"
-
-    def _get_task_meta_for(self, task_id):
-        """Get task metadata for a task by id."""
-        if task_id in self._cache:
-            return self._cache[task_id]
-        meta = self.open().get(self._cache_key(task_id))
-        if not meta:
-            return {"status": "PENDING", "result": None}
-        meta = json.loads(meta)
-        meta["result"] = pickle.loads(meta.get("result", None))
-        if meta.get("status") == "DONE":
-            self._cache[task_id] = meta
-        return meta
+    def get(self, key):
+        return self.open().get(key)
+
+    def set(self, key, value):
+        self.open()[key] = value
+