Parcourir la source

Implements TaskSetResult().delete (delete after save)

Ask Solem il y a 14 ans
Parent
commit
6cd3e95060

+ 5 - 2
celery/backends/amqp.py

@@ -262,11 +262,14 @@ class AMQPBackend(BaseDictBackend):
                 "reload_taskset_result is not supported by this backend.")
 
     def save_taskset(self, taskset_id, result):
-        """Store the result and status of a task."""
         raise NotImplementedError(
                 "save_taskset is not supported by this backend.")
 
     def restore_taskset(self, taskset_id, cache=True):
-        """Get the result of a taskset."""
         raise NotImplementedError(
                 "restore_taskset is not supported by this backend.")
+
+    def delete_taskset(self, taskset_id):
+        raise NotImplementedError(
+                "delete_taskset is not supported by this backend.")
+

+ 11 - 0
celery/backends/base.py

@@ -133,6 +133,10 @@ class BaseBackend(object):
         raise NotImplementedError(
                 "restore_taskset is not supported by this backend.")
 
+    def delete_taskset(self, taskset_id):
+        raise NotImplementedError(
+                "delete_taskset is not supported by this backend.")
+
     def reload_task_result(self, task_id):
         """Reload task result, even if it has been previously fetched."""
         raise NotImplementedError(
@@ -226,6 +230,10 @@ class BaseDictBackend(BaseBackend):
         """Store the result of an executed taskset."""
         return self._save_taskset(taskset_id, result)
 
+    def delete_taskset(self, taskset_id):
+        self._cache.pop(taskset_id, None)
+        return self._delete_taskset(taskset_id)
+
 
 class KeyValueStoreBackend(BaseDictBackend):
 
@@ -259,6 +267,9 @@ class KeyValueStoreBackend(BaseDictBackend):
                  pickle.dumps({"result": result}))
         return result
 
+    def _delete_taskset(self, taskset_id):
+        self.delete(self.get_key_for_taskset(taskset_id))
+
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
         meta = self.get(self.get_key_for_task(task_id))

+ 12 - 1
celery/backends/database.py

@@ -82,7 +82,7 @@ class DatabaseBackend(BaseDictBackend):
             session.close()
 
     def _restore_taskset(self, taskset_id):
-        """Get taskset metadata for a taskset by id."""
+        """Get metadata for taskset by id."""
         session = self.ResultSession()
         try:
             taskset = session.query(TaskSet).filter(
@@ -92,6 +92,17 @@ class DatabaseBackend(BaseDictBackend):
         finally:
             session.close()
 
+    def _delete_taskset(self, taskset_id):
+        """Delete metadata for taskset by id."""
+        session = self.ResultSession()
+        try:
+            session.query(TaskSet).filter(
+                    TaskSet.taskset_id == taskset_id).delete()
+            session.flush()
+            session.commit()
+        finally:
+            session.close()
+
     def _forget(self, task_id):
         """Forget about result."""
         session = self.ResultSession()

+ 6 - 4
celery/result.py

@@ -466,10 +466,12 @@ class TaskSetResult(ResultSet):
             >>> result = TaskSetResult.restore(taskset_id)
 
         """
-        if backend is None:
-            backend = self.app.backend
-        backend.save_taskset(self.taskset_id, self)
-        return self
+        return (backend or self.app.backend).save_taskset(self.taskset_id,
+                                                          self)
+
+    def delete(self, backend=None):
+        """Remove this result if it was previously saved."""
+        (backend or self.app.backend).delete_taskset(self.taskset_id)
 
     @classmethod
     def restore(self, taskset_id, backend=None):

+ 4 - 0
celery/tests/test_backends/test_amqp.py

@@ -287,3 +287,7 @@ class test_AMQPBackend(unittest.TestCase):
     def test_restore_taskset(self):
         self.assertRaises(NotImplementedError,
                           self.create_backend().restore_taskset, "x")
+    
+    def test_delete_taskset(self):
+        self.assertRaises(NotImplementedError,
+                          self.create_backend().delete_taskset, "x")

+ 24 - 6
celery/tests/test_backends/test_base.py

@@ -1,6 +1,7 @@
 import sys
 import types
-from celery.tests.utils import unittest
+
+from mock import Mock
 
 from celery.utils import serialization
 from celery.utils.serialization import subclass_exception
@@ -14,6 +15,7 @@ from celery.backends.base import BaseBackend, KeyValueStoreBackend
 from celery.backends.base import BaseDictBackend
 from celery.utils import gen_unique_id
 
+from celery.tests.utils import unittest
 
 class wrapobject(object):
 
@@ -67,6 +69,10 @@ class test_BaseBackend_interface(unittest.TestCase):
         self.assertRaises(NotImplementedError,
                 b.restore_taskset, "SOMExx-N0nex1stant-IDxx-")
 
+    def test_delete_taskset(self):
+        self.assertRaises(NotImplementedError,
+                b.delete_taskset, "SOMExx-N0nex1stant-IDxx-")
+
     def test_save_taskset(self):
         self.assertRaises(NotImplementedError,
                 b.save_taskset, "SOMExx-N0nex1stant-IDxx-", "blergh")
@@ -140,8 +146,9 @@ class KVBackend(KeyValueStoreBackend):
 
 class DictBackend(BaseDictBackend):
 
-    def _save_taskset(self, taskset_id, result):
-        return "taskset-saved"
+    def __init__(self, *args, **kwargs):
+        BaseDictBackend.__init__(self, *args, **kwargs)
+        self._data = {"can-delete": {"result": "foo"}}
 
     def _restore_taskset(self, taskset_id):
         if taskset_id == "exists":
@@ -151,15 +158,24 @@ class DictBackend(BaseDictBackend):
         if task_id == "task-exists":
             return {"result": "task"}
 
+    def _delete_taskset(self, taskset_id):
+        self._data.pop(taskset_id, None)
+
 
 class test_BaseDictBackend(unittest.TestCase):
 
     def setUp(self):
         self.b = DictBackend()
 
+    def test_delete_taskset(self):
+        self.b.delete_taskset("can-delete")
+        self.assertNotIn("can-delete", self.b._data)
+
     def test_save_taskset(self):
-        self.assertEqual(self.b.save_taskset("foofoo", "xxx"),
-                         "taskset-saved")
+        b = BaseDictBackend()
+        b._save_taskset = Mock()
+        b.save_taskset("foofoo", "xxx")
+        b._save_taskset.assert_called_with("foofoo", "xxx")
 
     def test_restore_taskset(self):
         self.assertIsNone(self.b.restore_taskset("missing"))
@@ -197,10 +213,12 @@ class test_KeyValueStoreBackend(unittest.TestCase):
         self.assertIsNone(self.b.get_result("xxx-missing"))
         self.assertEqual(self.b.get_status("xxx-missing"), states.PENDING)
 
-    def test_save_restore_taskset(self):
+    def test_save_restore_delete_taskset(self):
         tid = gen_unique_id()
         self.b.save_taskset(tid, "Hello world")
         self.assertEqual(self.b.restore_taskset(tid), "Hello world")
+        self.b.delete_taskset(tid)
+        self.assertIsNone(self.b.restore_taskset(tid))
 
     def test_restore_missing_taskset(self):
         self.assertIsNone(self.b.restore_taskset("xxx-nonexistant"))

+ 4 - 1
celery/tests/test_backends/test_database.py

@@ -158,7 +158,7 @@ class test_DatabaseBackend(unittest.TestCase):
         tb = DatabaseBackend()
         tb.process_cleanup()
 
-    def test_save___restore_taskset(self):
+    def test_save__restore__delete_taskset(self):
         tb = DatabaseBackend()
 
         tid = gen_unique_id()
@@ -168,6 +168,9 @@ class test_DatabaseBackend(unittest.TestCase):
         res2 = tb.restore_taskset(tid)
         self.assertEqual(res2, res)
 
+        tb.delete_taskset(tid)
+        self.assertIsNone(tb.restore_taskset(tid))
+
         self.assertIsNone(tb.restore_taskset("xxx-nonexisting-id"))
 
     def test_cleanup(self):

+ 2 - 0
celery/tests/test_task/test_result.py

@@ -215,6 +215,8 @@ class TestTaskSetResult(unittest.TestCase):
         self.assertRaises(AttributeError, ts.save, backend=object())
         self.assertEqual(TaskSetResult.restore(ts.taskset_id).subtasks,
                          ts.subtasks)
+        ts.delete()
+        self.assertIsNone(TaskSetResult.restore(ts.taskset_id))
         self.assertRaises(AttributeError,
                           TaskSetResult.restore, ts.taskset_id,
                           backend=object())