Переглянути джерело

MongoDB: Creates an index on date_done so cleanup process works more efficienty.

Closes #661
Chris Angove 13 роки тому
батько
коміт
64c5894791
1 змінених файлів з 32 додано та 42 видалено
  1. 32 42
      celery/backends/mongodb.py

+ 32 - 42
celery/backends/mongodb.py

@@ -9,6 +9,8 @@ try:
 except ImportError:
     pymongo = None  # noqa
 
+from kombu.utils import cached_property
+
 from celery import states
 from celery.exceptions import ImproperlyConfigured
 from celery.utils.timeutils import maybe_timedelta
@@ -63,7 +65,6 @@ class MongoBackend(BaseDictBackend):
                 "taskmeta_collection", self.mongodb_taskmeta_collection)
 
         self._connection = None
-        self._database = None
 
     def _get_connection(self):
         """Connect to the MongoDB server."""
@@ -85,22 +86,6 @@ class MongoBackend(BaseDictBackend):
 
         return self._connection
 
-    def _get_database(self):
-        """"Get database from MongoDB connection and perform authentication
-        if necessary."""
-        if self._database is None:
-            conn = self._get_connection()
-            db = conn[self.mongodb_database]
-            if self.mongodb_user and self.mongodb_password:
-                auth = db.authenticate(self.mongodb_user,
-                                       self.mongodb_password)
-                if not auth:
-                    raise ImproperlyConfigured(
-                        "Invalid MongoDB username or password.")
-            self._database = db
-
-        return self._database
-
     def process_cleanup(self):
         if self._connection is not None:
             # MongoDB connection will be closed automatically when object
@@ -116,19 +101,14 @@ class MongoBackend(BaseDictBackend):
                 "result": Binary(self.encode(result)),
                 "date_done": datetime.utcnow(),
                 "traceback": Binary(self.encode(traceback))}
-
-        db = self._get_database()
-        taskmeta_collection = db[self.mongodb_taskmeta_collection]
-        taskmeta_collection.save(meta, safe=True)
+        self.collection.save(meta, safe=True)
 
         return result
 
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
 
-        db = self._get_database()
-        taskmeta_collection = db[self.mongodb_taskmeta_collection]
-        obj = taskmeta_collection.find_one({"_id": task_id})
+        obj = self.collection.find_one({"_id": task_id})
         if not obj:
             return {"status": states.PENDING, "result": None}
 
@@ -149,18 +129,13 @@ class MongoBackend(BaseDictBackend):
         meta = {"_id": taskset_id,
                 "result": Binary(self.encode(result)),
                 "date_done": datetime.utcnow()}
-
-        db = self._get_database()
-        taskmeta_collection = db[self.mongodb_taskmeta_collection]
-        taskmeta_collection.save(meta, safe=True)
+        self.collection.save(meta, safe=True)
 
         return result
 
     def _restore_taskset(self, taskset_id):
         """Get the result for a taskset by id."""
-        db = self._get_database()
-        taskmeta_collection = db[self.mongodb_taskmeta_collection]
-        obj = taskmeta_collection.find_one({"_id": taskset_id})
+        obj = self.collection.find_one({"_id": taskset_id})
         if not obj:
             return
 
@@ -174,9 +149,7 @@ class MongoBackend(BaseDictBackend):
 
     def _delete_taskset(self, taskset_id):
         """Delete a taskset by id."""
-        db = self._get_database()
-        taskmeta_collection = db[self.mongodb_taskmeta_collection]
-        taskmeta_collection.remove({"_id": taskset_id})
+        self.collection.remove({"_id": taskset_id})
 
     def _forget(self, task_id):
         """
@@ -185,20 +158,14 @@ class MongoBackend(BaseDictBackend):
         :raises celery.exceptions.OperationsError: if the task_id could not be
                                                    removed.
         """
-
-        db = self._get_database()
-        taskmeta_collection = db[self.mongodb_taskmeta_collection]
-
         # By using safe=True, this will wait until it receives a response from
         # the server.  Likewise, it will raise an OperationsError if the
         # response was unable to be completed.
-        taskmeta_collection.remove({"_id": task_id}, safe=True)
+        self.collection.remove({"_id": task_id}, safe=True)
 
     def cleanup(self):
         """Delete expired metadata."""
-        db = self._get_database()
-        taskmeta_collection = db[self.mongodb_taskmeta_collection]
-        taskmeta_collection.remove({
+        self.collection.remove({
                 "date_done": {
                     "$lt": self.app.now() - self.expires,
                  }
@@ -208,3 +175,26 @@ class MongoBackend(BaseDictBackend):
         kwargs.update(
             dict(expires=self.expires))
         return super(MongoBackend, self).__reduce__(args, kwargs)
+
+    @cached_property
+    def database(self):
+        """Get database from MongoDB connection and perform authentication
+        if necessary."""
+        conn = self._get_connection()
+        db = conn[self.mongodb_database]
+        if self.mongodb_user and self.mongodb_password:
+            if not db.authenticate(self.mongodb_user,
+                                   self.mongodb_password):
+                raise ImproperlyConfigured(
+                    "Invalid MongoDB username or password.")
+        return db
+
+    @cached_property
+    def collection(self):
+        """Get the metadata task collection."""
+        collection = self.database[self.mongodb_taskmeta_collection]
+
+        # Ensure an index on date_done is there, if not process the index
+        # in the background. Once completed cleanup will be much faster
+        collection.ensure_index('date_done', background='true')
+        return collection