|
@@ -5,6 +5,8 @@ from celery.serialization import get_pickled_exception
|
|
|
from celery.serialization import get_pickleable_exception
|
|
|
from celery.exceptions import TimeoutError
|
|
|
|
|
|
+EXCEPTION_STATES = frozenset(["RETRY", "FAILURE"])
|
|
|
+
|
|
|
|
|
|
class BaseBackend(object):
|
|
|
"""The base backend class. All backends should inherit from this."""
|
|
@@ -12,6 +14,12 @@ class BaseBackend(object):
|
|
|
capabilities = []
|
|
|
TimeoutError = TimeoutError
|
|
|
|
|
|
+ def encode_result(self, result, status):
|
|
|
+ if status == "SUCCESS":
|
|
|
+ return self.prepare_value(result)
|
|
|
+ elif status in EXCEPTION_STATES:
|
|
|
+ return self.prepare_exception(result)
|
|
|
+
|
|
|
def store_result(self, task_id, result, status):
|
|
|
"""Store the result and status of a task."""
|
|
|
raise NotImplementedError(
|
|
@@ -45,10 +53,8 @@ class BaseBackend(object):
|
|
|
raise NotImplementedError(
|
|
|
"get_status is not supported by this backend.")
|
|
|
|
|
|
- def prepare_result(self, result):
|
|
|
- """Prepare result for storage."""
|
|
|
- if result is None:
|
|
|
- return True
|
|
|
+ def prepare_value(self, result):
|
|
|
+ """Prepare value for storage."""
|
|
|
return result
|
|
|
|
|
|
def get_result(self, task_id):
|
|
@@ -126,10 +132,7 @@ class KeyValueStoreBackend(BaseBackend):
|
|
|
|
|
|
def store_result(self, task_id, result, status, traceback=None):
|
|
|
"""Store task result and status."""
|
|
|
- if status == "SUCCESS":
|
|
|
- result = self.prepare_result(result)
|
|
|
- elif status == "FAILURE":
|
|
|
- result = self.prepare_exception(result)
|
|
|
+ result = self.encode_result(result, status)
|
|
|
meta = {"status": status, "result": result, "traceback": traceback}
|
|
|
self.set(self.get_cache_key_for_task(task_id), pickle.dumps(meta))
|
|
|
return result
|