فهرست منبع

Encode results according to state in one common place, also fixes some backends not preparing exceptions on retry.

Ask Solem 15 سال پیش
والد
کامیت
1cdee72466
4فایلهای تغییر یافته به همراه14 افزوده شده و 21 حذف شده
  1. 1 5
      celery/backends/amqp.py
  2. 11 8
      celery/backends/base.py
  3. 1 4
      celery/backends/database.py
  4. 1 4
      celery/backends/mongodb.py

+ 1 - 5
celery/backends/amqp.py

@@ -56,11 +56,7 @@ class Backend(BaseBackend):
 
 
     def store_result(self, task_id, result, status, traceback=None):
     def store_result(self, task_id, result, status, traceback=None):
         """Send task return value and status."""
         """Send task return value and status."""
-        if status == "DONE":
-            result = self.prepare_result(result)
-        elif status == "FAILURE":
-            result = self.prepare_exception(result)
-
+        result = self.encode_result(result, status)
 
 
         meta = {"task_id": task_id,
         meta = {"task_id": task_id,
                 "result": result,
                 "result": result,

+ 11 - 8
celery/backends/base.py

@@ -5,6 +5,8 @@ from celery.serialization import get_pickled_exception
 from celery.serialization import get_pickleable_exception
 from celery.serialization import get_pickleable_exception
 from celery.exceptions import TimeoutError
 from celery.exceptions import TimeoutError
 
 
+EXCEPTION_STATES = frozenset(["RETRY", "FAILURE"])
+
 
 
 class BaseBackend(object):
 class BaseBackend(object):
     """The base backend class. All backends should inherit from this."""
     """The base backend class. All backends should inherit from this."""
@@ -12,6 +14,12 @@ class BaseBackend(object):
     capabilities = []
     capabilities = []
     TimeoutError = TimeoutError
     TimeoutError = TimeoutError
 
 
+    def encode_result(self, result, status):
+        if status == "DONE":
+            return self.prepare_value(result)
+        elif status in EXCEPTION_STATES:
+            return self.prepare_exception(result)
+
     def store_result(self, task_id, result, status):
     def store_result(self, task_id, result, status):
         """Store the result and status of a task."""
         """Store the result and status of a task."""
         raise NotImplementedError(
         raise NotImplementedError(
@@ -45,10 +53,8 @@ class BaseBackend(object):
         raise NotImplementedError(
         raise NotImplementedError(
                 "get_status is not supported by this backend.")
                 "get_status is not supported by this backend.")
 
 
-    def prepare_result(self, result):
-        """Prepare result for storage."""
-        if result is None:
-            return True
+    def prepare_value(self, result):
+        """Prepare value for storage."""
         return result
         return result
 
 
     def get_result(self, task_id):
     def get_result(self, task_id):
@@ -126,10 +132,7 @@ class KeyValueStoreBackend(BaseBackend):
 
 
     def store_result(self, task_id, result, status, traceback=None):
     def store_result(self, task_id, result, status, traceback=None):
         """Store task result and status."""
         """Store task result and status."""
-        if status == "DONE":
-            result = self.prepare_result(result)
-        elif status == "FAILURE":
-            result = self.prepare_exception(result)
+        result = self.encode_result(result, status)
         meta = {"status": status, "result": result, "traceback": traceback}
         meta = {"status": status, "result": result, "traceback": traceback}
         self.set(self.get_cache_key_for_task(task_id), pickle.dumps(meta))
         self.set(self.get_cache_key_for_task(task_id), pickle.dumps(meta))
         return result
         return result

+ 1 - 4
celery/backends/database.py

@@ -32,10 +32,7 @@ class Backend(BaseBackend):
 
 
     def store_result(self, task_id, result, status, traceback=None):
     def store_result(self, task_id, result, status, traceback=None):
         """Store return value and status of an executed task."""
         """Store return value and status of an executed task."""
-        if status == "DONE":
-            result = self.prepare_result(result)
-        elif status in ["FAILURE", "RETRY"]:
-            result = self.prepare_exception(result)
+        result = self.encode_result(result, status)
         TaskMeta.objects.store_result(task_id, result, status,
         TaskMeta.objects.store_result(task_id, result, status,
                                       traceback=traceback)
                                       traceback=traceback)
         return result
         return result

+ 1 - 4
celery/backends/mongodb.py

@@ -185,10 +185,7 @@ class Backend(BaseBackend):
         """Store return value and status of an executed task."""
         """Store return value and status of an executed task."""
         from pymongo.binary import Binary
         from pymongo.binary import Binary
 
 
-        if status == 'DONE':
-            result = self.prepare_result(result)
-        elif status == 'FAILURE':
-            result = self.prepare_exception(result)
+        result = self.encode_result(result, status)
 
 
         meta = {"_id": task_id,
         meta = {"_id": task_id,
                 "status": status,
                 "status": status,