Ver código fonte

save/restore/delete taskset in MongoBackend

Implements _save_taskset, _restore_taskset and _delete taskset in
MongoBackend.

Unittest that conditionally runs if pymongo is installed and MongoDB is
running.
julienp 13 anos atrás
pai
commit
ff4679bab0

+ 38 - 0
celery/backends/mongodb.py

@@ -142,6 +142,44 @@ class MongoBackend(BaseDictBackend):
 
         return meta
 
+    def _save_taskset(self, taskset_id, result):
+        """Save the taskset result."""
+        from pymongo.binary import Binary
+
+        meta = {
+            "_id": taskset_id,
+            "result": Binary(self.encode(result)),
+            "date_done": datetime.utcnow()
+        }
+
+        db = self._get_database()
+        taskmeta_collection = db[self.mongodb_taskmeta_collection]
+        taskmeta_collection.save(meta, safe=True)
+
+        return result
+
+    def _restore_taskset(self, taskset_id):
+        """Get the result for a taskset by id."""
+        db = self._get_database()
+        taskmeta_collection = db[self.mongodb_taskmeta_collection]
+        obj = taskmeta_collection.find_one({"_id": taskset_id})
+        if not obj:
+            return None
+
+        meta = {
+            "task_id": obj["_id"],
+            "result": self.decode(obj["result"]),
+            "date_done": obj["date_done"],
+        }
+
+        return meta
+
+    def _delete_taskset(self, taskset_id):
+        """Delete a taskset by id."""
+        db = self._get_database()
+        taskmeta_collection = db[self.mongodb_taskmeta_collection]
+        taskmeta_collection.remove({"_id": taskset_id})
+
     def cleanup(self):
         """Delete expired metadata."""
         db = self._get_database()

+ 14 - 0
celery/tests/config.py

@@ -28,3 +28,17 @@ CELERY_REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost"
 CELERY_REDIS_PORT = int(os.environ.get("REDIS_PORT") or 6379)
 CELERY_REDIS_DB = os.environ.get("REDIS_DB") or 0
 CELERY_REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
+
+# Mongo results tests (only executed if installed and running)
+CELERY_MONGODB_BACKEND_SETTINGS = {
+    "host": os.environ.get("MONGO_HOST") or "localhost",
+    "port": os.environ.get("MONGO_PORT") or 27017,
+    "database": os.environ.get("MONGO_DB") or "celery_unittests",
+    "taskmeta_collection": os.environ.get("MONGO_TASKMETA_COLLECTION") or
+        "taskmeta_collection"
+}
+if os.environ.get("MONGO_USER"):
+    CELERY_MONGODB_BACKEND_SETTINGS["user"] = os.environ.get("MONGO_USER")
+if os.environ.get("MONGO_PASSWORD"):
+    CELERY_MONGODB_BACKEND_SETTINGS["password"] = \
+        os.environ.get("MONGO_PASSWORD")

+ 63 - 0
celery/tests/test_backends/test_mongodb.py

@@ -0,0 +1,63 @@
+from __future__ import absolute_import
+
+import sys
+
+from nose import SkipTest
+
+from celery.backends.mongodb import MongoBackend
+from celery.exceptions import ImproperlyConfigured
+from celery.tests.utils import unittest
+from celery.utils import uuid
+
+
+_no_mongo_msg = "* MongoDB %s. Will not execute related tests."
+_no_mongo_msg_emitted = False
+
+
+try:
+    from pymongo.errors import AutoReconnect
+except ImportError:
+    class AutoReconnect(Exception):
+        pass
+
+
+def get_mongo_or_SkipTest():
+
+    def emit_no_mongo_msg(reason):
+        global _no_mongo_msg_emitted
+        if not _no_mongo_msg_emitted:
+            sys.stderr.write("\n" + _no_mongo_msg % reason + "\n")
+            _no_mongo_msg_emitted = True
+
+    try:
+        tb = MongoBackend()
+        try:
+            tb._get_database()
+        except AutoReconnect, exc:
+            emit_no_mongo_msg("not running")
+            raise SkipTest("Can't connect to MongoDB: %s" % (exc, ))
+        return tb
+    except ImproperlyConfigured, exc:
+        if "need to install" in str(exc):
+            emit_no_mongo_msg("pymongo not installed")
+            raise SkipTest("pymongo not installed")
+        emit_no_mongo_msg("not configured")
+        raise SkipTest("MongoDB not configured correctly: %s" % (exc, ))
+
+
+class TestMongoBackend(unittest.TestCase):
+
+    def test_save__restore__delete_taskset(self):
+        tb = get_mongo_or_SkipTest()
+
+        tid = uuid()
+        res = {u"foo": "bar"}
+        self.assertEqual(tb.save_taskset(tid, res), res)
+
+        res2 = tb.restore_taskset(tid)
+        self.assertEqual(res2, res)
+
+        tb.delete_taskset(tid)
+        self.assertIsNone(tb.restore_taskset(tid))
+
+        self.assertIsNone(tb.restore_taskset("xxx-nonexisting-id"))