Browse Source

Re-enable result caches, as it's a critical part of the API flow. Caching now
moved to the base classes, so no need to implement it in the base classes.
Caches are of course limited in size, so their memory usage doesn't grow
uncontrollably, you can set the maximum number of results the cache can hold
using the CELERY_MAX_CACHED_RESULTS setting (the default is 5000). In
addition, you can refetch already retrieved results using
backend.reload_task_result + backend.reload_taskset_result (that's for those
who want to send results incrementally).

Ask Solem 15 years ago
parent
commit
9980653481

+ 24 - 25
celery/backends/amqp.py

@@ -3,12 +3,12 @@ from carrot.messaging import Consumer, Publisher
 
 
 from celery import conf
 from celery import conf
 from celery import states
 from celery import states
-from celery.backends.base import BaseBackend
+from celery.backends.base import BaseDictBackend
 from celery.messaging import establish_connection
 from celery.messaging import establish_connection
 from celery.datastructures import LocalCache
 from celery.datastructures import LocalCache
 
 
 
 
-class AMQPBackend(BaseBackend):
+class AMQPBackend(BaseDictBackend):
     """AMQP backend. Publish results by sending messages to the broker
     """AMQP backend. Publish results by sending messages to the broker
     using the task id as routing key.
     using the task id as routing key.
 
 
@@ -21,10 +21,11 @@ class AMQPBackend(BaseBackend):
     exchange = conf.RESULT_EXCHANGE
     exchange = conf.RESULT_EXCHANGE
     capabilities = ["ResultStore"]
     capabilities = ["ResultStore"]
     _connection = None
     _connection = None
+    _use_debug_tracking = False
+    _seen = set()
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         super(AMQPBackend, self).__init__(*args, **kwargs)
         super(AMQPBackend, self).__init__(*args, **kwargs)
-        self._cache = LocalCache(limit=1000)
 
 
     @property
     @property
     def connection(self):
     def connection(self):
@@ -79,22 +80,9 @@ class AMQPBackend(BaseBackend):
 
 
         return result
         return result
 
 
-    def is_successful(self, task_id):
-        """Returns ``True`` if task with ``task_id`` has been executed."""
-        return self.get_status(task_id) == states.SUCCESS
-
-    def get_status(self, task_id):
-        """Get the status of a task."""
-        return self._get_task_meta_for(task_id)["status"]
-
-    def get_traceback(self, task_id):
-        """Get the traceback for a failed task."""
-        return self._get_task_meta_for(task_id)["traceback"]
-
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):
-
-        if task_id in self._cache:
-            return self._cache[task_id]
+        assert task_id not in self._seen
+        self._use_debug_tracking and self._seen.add(task_id)
 
 
         results = []
         results = []
 
 
@@ -117,10 +105,21 @@ class AMQPBackend(BaseBackend):
         self._cache[task_id] = results[0]
         self._cache[task_id] = results[0]
         return results[0]
         return results[0]
 
 
-    def get_result(self, task_id):
-        """Get the result for a task."""
-        result = self._get_task_meta_for(task_id)
-        if result["status"] in states.EXCEPTION_STATES:
-            return self.exception_to_python(result["result"])
-        else:
-            return result["result"]
+    def reload_task_result(self, task_id):
+        raise NotImplementedError(
+                "reload_task_result is not supported by this backend.")
+
+    def reload_taskset_result(self, task_id):
+        """Reload taskset result, even if it has been previously fetched."""
+        raise NotImplementedError(
+                "reload_taskset_result is not supported by this backend.")
+
+    def save_taskset(self, taskset_id, result):
+        """Store the result and status of a task."""
+        raise NotImplementedError(
+                "save_taskset is not supported by this backend.")
+
+    def restore_taskset(self, taskset_id, cache=True):
+        """Get the result of a taskset."""
+        raise NotImplementedError(
+                "restore_taskset is not supported by this backend.")

+ 44 - 9
celery/backends/base.py

@@ -6,8 +6,10 @@ from billiard.serialization import pickle
 from billiard.serialization import get_pickled_exception
 from billiard.serialization import get_pickled_exception
 from billiard.serialization import get_pickleable_exception
 from billiard.serialization import get_pickleable_exception
 
 
-from celery.exceptions import TimeoutError
+from celery import conf
 from celery import states
 from celery import states
+from celery.exceptions import TimeoutError
+from celery.datastructures import LocalCache
 
 
 
 
 class BaseBackend(object):
 class BaseBackend(object):
@@ -122,11 +124,20 @@ class BaseBackend(object):
         raise NotImplementedError(
         raise NotImplementedError(
                 "save_taskset is not supported by this backend.")
                 "save_taskset is not supported by this backend.")
 
 
-    def restore_taskset(self, taskset_id):
+    def restore_taskset(self, taskset_id, cache=True):
         """Get the result of a taskset."""
         """Get the result of a taskset."""
         raise NotImplementedError(
         raise NotImplementedError(
                 "restore_taskset is not supported by this backend.")
                 "restore_taskset is not supported by this backend.")
 
 
+    def reload_task_result(self, task_id):
+        """Reload task result, even if it has been previously fetched."""
+        raise NotImplementedError(
+                "reload_task_result is not supported by this backend.")
+
+    def reload_taskset_result(self, task_id):
+        """Reload taskset result, even if it has been previously fetched."""
+        raise NotImplementedError(
+                "reload_taskset_result is not supported by this backend.")
 
 
 class BaseDictBackend(BaseBackend):
 class BaseDictBackend(BaseBackend):
 
 
@@ -134,6 +145,7 @@ class BaseDictBackend(BaseBackend):
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         super(BaseDictBackend, self).__init__(*args, **kwargs)
         super(BaseDictBackend, self).__init__(*args, **kwargs)
+        self._cache = LocalCache(limit=conf.MAX_CACHED_RESULTS)
 
 
     def store_result(self, task_id, result, status, traceback=None):
     def store_result(self, task_id, result, status, traceback=None):
         """Store task result and status."""
         """Store task result and status."""
@@ -142,23 +154,47 @@ class BaseDictBackend(BaseBackend):
 
 
     def get_status(self, task_id):
     def get_status(self, task_id):
         """Get the status of a task."""
         """Get the status of a task."""
-        return self._get_task_meta_for(task_id)["status"]
+        return self.get_task_meta(task_id)["status"]
 
 
     def get_traceback(self, task_id):
     def get_traceback(self, task_id):
         """Get the traceback for a failed task."""
         """Get the traceback for a failed task."""
-        return self._get_task_meta_for(task_id)["traceback"]
+        return self.get_task_meta(task_id)["traceback"]
 
 
     def get_result(self, task_id):
     def get_result(self, task_id):
         """Get the result of a task."""
         """Get the result of a task."""
-        meta = self._get_task_meta_for(task_id)
+        meta = self.get_task_meta(task_id)
         if meta["status"] in self.EXCEPTION_STATES:
         if meta["status"] in self.EXCEPTION_STATES:
             return self.exception_to_python(meta["result"])
             return self.exception_to_python(meta["result"])
         else:
         else:
             return meta["result"]
             return meta["result"]
 
 
-    def restore_taskset(self, taskset_id):
-        """Get the result for a taskset."""
+    def get_task_meta(self, task_id, cache=True):
+        if cache and task_id in self._cache:
+            return self._cache[task_id]
+
+        meta = self._get_task_meta_for(task_id)
+        if cache and meta.get("status") == states.SUCCESS:
+            self._cache[task_id] = meta
+        return meta
+
+    def reload_task_result(self, task_id):
+        self._cache[task_id] = self.get_task_meta(task_id, cache=False)
+
+    def reload_taskset_result(self, taskset_id):
+        self._cache[taskset_id] = self.get_taskset_meta(task_id, cache=False)
+
+    def get_taskset_meta(self, taskset_id, cache=True):
+        if cache and taskset_id in self._cache:
+            return self._cache[taskset_id]
+
         meta = self._restore_taskset(taskset_id)
         meta = self._restore_taskset(taskset_id)
+        if cache and meta is not None:
+            self._cache[taskset_id] = meta
+        return meta
+
+    def restore_taskset(self, taskset_id, cache=True):
+        """Get the result for a taskset."""
+        meta = self.get_taskset_meta(taskset_id, cache=cache)
         if meta:
         if meta:
             return meta["result"]
             return meta["result"]
 
 
@@ -198,8 +234,7 @@ class KeyValueStoreBackend(BaseDictBackend):
         meta = self.get(self.get_key_for_task(task_id))
         meta = self.get(self.get_key_for_task(task_id))
         if not meta:
         if not meta:
             return {"status": states.PENDING, "result": None}
             return {"status": states.PENDING, "result": None}
-        meta = pickle.loads(str(meta))
-        return meta
+        return pickle.loads(str(meta))
 
 
     def _restore_taskset(self, taskset_id):
     def _restore_taskset(self, taskset_id):
         """Get task metadata for a task by id."""
         """Get task metadata for a task by id."""

+ 2 - 4
celery/backends/database.py

@@ -21,15 +21,13 @@ class DatabaseBackend(BaseDictBackend):
         """Get task metadata for a task by id."""
         """Get task metadata for a task by id."""
         meta = TaskMeta.objects.get_task(task_id)
         meta = TaskMeta.objects.get_task(task_id)
         if meta:
         if meta:
-            meta = meta.to_dict()
-            return meta
+            return meta.to_dict()
 
 
     def _restore_taskset(self, taskset_id):
     def _restore_taskset(self, taskset_id):
         """Get taskset metadata for a taskset by id."""
         """Get taskset metadata for a taskset by id."""
         meta = TaskSetMeta.objects.restore_taskset(taskset_id)
         meta = TaskSetMeta.objects.restore_taskset(taskset_id)
         if meta:
         if meta:
-            meta = meta.to_dict()
-            return meta
+            return meta.to_dict()
 
 
     def cleanup(self):
     def cleanup(self):
         """Delete expired metadata."""
         """Delete expired metadata."""

+ 6 - 1
celery/conf.py

@@ -49,6 +49,7 @@ _DEFAULTS = {
     "CELERY_EVENT_EXCHANGE_TYPE": "direct",
     "CELERY_EVENT_EXCHANGE_TYPE": "direct",
     "CELERY_EVENT_ROUTING_KEY": "celeryevent",
     "CELERY_EVENT_ROUTING_KEY": "celeryevent",
     "CELERY_RESULT_EXCHANGE": "celeryresults",
     "CELERY_RESULT_EXCHANGE": "celeryresults",
+    "CELERY_MAX_CACHED_RESULTS": 5000,
 }
 }
 
 
 _DEPRECATION_FMT = """
 _DEPRECATION_FMT = """
@@ -70,7 +71,7 @@ def _get(name, default=None, compat=None):
             pass
             pass
     return default
     return default
 
 
-# <--- Task options                                <-   --   --- - ----- -- #
+# <--- Task                                        <-   --   --- - ----- -- #
 ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
 ALWAYS_EAGER = _get("CELERY_ALWAYS_EAGER")
 CELERY_BACKEND = _get("CELERY_BACKEND")
 CELERY_BACKEND = _get("CELERY_BACKEND")
 CELERY_CACHE_BACKEND = _get("CELERY_CACHE_BACKEND")
 CELERY_CACHE_BACKEND = _get("CELERY_CACHE_BACKEND")
@@ -81,6 +82,10 @@ IGNORE_RESULT = _get("CELERY_IGNORE_RESULT")
 if isinstance(TASK_RESULT_EXPIRES, int):
 if isinstance(TASK_RESULT_EXPIRES, int):
     TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)
     TASK_RESULT_EXPIRES = timedelta(seconds=TASK_RESULT_EXPIRES)
 
 
+# <--- Client                                      <-   --   --- - ----- -- #
+
+MAX_CACHED_RESULTS = _get("CELERY_MAX_CACHED_RESULTS")
+
 # <--- Worker                                      <-   --   --- - ----- -- #
 # <--- Worker                                      <-   --   --- - ----- -- #
 
 
 SEND_EVENTS = _get("CELERY_SEND_EVENTS")
 SEND_EVENTS = _get("CELERY_SEND_EVENTS")

+ 1 - 0
celery/tests/test_backends/test_amqp.py

@@ -22,6 +22,7 @@ class TestRedisBackend(unittest.TestCase):
 
 
     def setUp(self):
     def setUp(self):
         self.backend = AMQPBackend()
         self.backend = AMQPBackend()
+        self.backend._use_debug_tracking = True
 
 
     def test_mark_as_done(self):
     def test_mark_as_done(self):
         tb = self.backend
         tb = self.backend