|
@@ -57,34 +57,14 @@ class BaseBackend(object):
|
|
|
"""Convert serialized exception to Python exception."""
|
|
|
return get_pickled_exception(exc)
|
|
|
|
|
|
- def get_status(self, task_id):
|
|
|
- """Get the status of a task."""
|
|
|
- raise NotImplementedError(
|
|
|
- "get_status is not supported by this backend.")
|
|
|
-
|
|
|
def prepare_value(self, result):
|
|
|
"""Prepare value for storage."""
|
|
|
return result
|
|
|
|
|
|
- def get_result(self, task_id):
|
|
|
- """Get the result of a task."""
|
|
|
- raise NotImplementedError(
|
|
|
- "get_result is not supported by this backend.")
|
|
|
-
|
|
|
- def get_traceback(self, task_id):
|
|
|
- """Get the traceback for a failed task."""
|
|
|
- raise NotImplementedError(
|
|
|
- "get_traceback is not supported by this backend.")
|
|
|
-
|
|
|
def is_successful(self, task_id):
|
|
|
"""Returns ``True`` if the task was successfully executed."""
|
|
|
return self.get_status(task_id) == states.SUCCESS
|
|
|
|
|
|
- def cleanup(self):
|
|
|
- """Backend cleanup. Is run by
|
|
|
- :class:`celery.task.DeleteExpiredTaskMetaTask`."""
|
|
|
- pass
|
|
|
-
|
|
|
def wait_for(self, task_id, timeout=None):
|
|
|
"""Wait for task and return its result.
|
|
|
|
|
@@ -112,19 +92,39 @@ class BaseBackend(object):
|
|
|
if timeout and time_elapsed >= timeout:
|
|
|
raise TimeoutError("The operation timed out.")
|
|
|
|
|
|
+ def cleanup(self):
|
|
|
+ """Backend cleanup. Is run by
|
|
|
+ :class:`celery.task.DeleteExpiredTaskMetaTask`."""
|
|
|
+ pass
|
|
|
+
|
|
|
def process_cleanup(self):
|
|
|
"""Cleanup actions to do at the end of a task worker process."""
|
|
|
pass
|
|
|
|
|
|
- def store_taskset(self, taskset_id, result):
|
|
|
+ def get_status(self, task_id):
|
|
|
+ """Get the status of a task."""
|
|
|
+ raise NotImplementedError(
|
|
|
+ "get_status is not supported by this backend.")
|
|
|
+
|
|
|
+ def get_result(self, task_id):
|
|
|
+ """Get the result of a task."""
|
|
|
+ raise NotImplementedError(
|
|
|
+ "get_result is not supported by this backend.")
|
|
|
+
|
|
|
+ def get_traceback(self, task_id):
|
|
|
+ """Get the traceback for a failed task."""
|
|
|
+ raise NotImplementedError(
|
|
|
+ "get_traceback is not supported by this backend.")
|
|
|
+
|
|
|
+ def save_taskset(self, taskset_id, result):
|
|
|
"""Store the result and status of a task."""
|
|
|
raise NotImplementedError(
|
|
|
- "store_taskset is not supported by this backend.")
|
|
|
+ "save_taskset is not supported by this backend.")
|
|
|
|
|
|
- def get_taskset(self, taskset_id):
|
|
|
+ def restore_taskset(self, taskset_id):
|
|
|
"""Get the result of a taskset."""
|
|
|
raise NotImplementedError(
|
|
|
- "get_taskset is not supported by this backend.")
|
|
|
+ "restore_taskset is not supported by this backend.")
|
|
|
|
|
|
|
|
|
class BaseDictBackend(BaseBackend):
|
|
@@ -156,15 +156,15 @@ class BaseDictBackend(BaseBackend):
|
|
|
else:
|
|
|
return meta["result"]
|
|
|
|
|
|
- def get_taskset(self, taskset_id):
|
|
|
+ def restore_taskset(self, taskset_id):
|
|
|
"""Get the result for a taskset."""
|
|
|
- meta = self._get_taskset_meta_for(taskset_id)
|
|
|
+ meta = self._restore_taskset(taskset_id)
|
|
|
if meta:
|
|
|
return meta["result"]
|
|
|
|
|
|
- def store_taskset(self, taskset_id, result):
|
|
|
+ def save_taskset(self, taskset_id, result):
|
|
|
"""Store the result of an executed taskset."""
|
|
|
- return self._store_taskset(taskset_id, result)
|
|
|
+ return self._save_taskset(taskset_id, result)
|
|
|
|
|
|
|
|
|
class KeyValueStoreBackend(BaseDictBackend):
|
|
@@ -188,7 +188,7 @@ class KeyValueStoreBackend(BaseDictBackend):
|
|
|
self.set(self.get_key_for_task(task_id), pickle.dumps(meta))
|
|
|
return result
|
|
|
|
|
|
- def _store_taskset(self, taskset_id, result):
|
|
|
+ def _save_taskset(self, taskset_id, result):
|
|
|
meta = {"result": result}
|
|
|
self.set(self.get_key_for_taskset(taskset_id), pickle.dumps(meta))
|
|
|
return result
|
|
@@ -205,7 +205,7 @@ class KeyValueStoreBackend(BaseDictBackend):
|
|
|
self._cache[task_id] = meta
|
|
|
return meta
|
|
|
|
|
|
- def _get_taskset_meta_for(self, taskset_id):
|
|
|
+ def _restore_taskset(self, taskset_id):
|
|
|
"""Get task metadata for a task by id."""
|
|
|
if taskset_id in self._cache:
|
|
|
return self._cache[taskset_id]
|