Browse Source

added mongodb backend with support for regular and periodic tasks.

abecciu 15 years ago
parent
commit
e3eea25022
1 changed files with 252 additions and 0 deletions
  1. 252 0
      celery/backends/mongodb.py

+ 252 - 0
celery/backends/mongodb.py

@@ -0,0 +1,252 @@
+"""MongoDB backend for celery."""
+
+import random
+from datetime import datetime, timedelta
+
+from django.core.exceptions import ImproperlyConfigured
+from celery.serialization import pickle
+from celery.backends.base import BaseBackend
+from celery.loaders import settings
+from celery.conf import TASK_RESULT_EXPIRES
+from celery.registry import tasks
+
+try:
+    import pymongo
+except ImportError:
+    pymongo = None
+
+# taken from celery.managers.PeriodicTaskManager
+SERVER_DRIFT = timedelta(seconds=random.vonmisesvariate(1, 4))
+
+
+class Bunch:
+    def __init__(self, **kw):
+        self.__dict__.update(kw)
+
+class Backend(BaseBackend):
+
+    capabilities = ("ResultStore", "PeriodicStatus")
+
+    mongodb_host = 'localhost'
+    mongodb_port = 27017
+    mongodb_user = None
+    mongodb_password = None
+    mongodb_database = 'celery'
+    mongodb_taskmeta_collection = 'celery_taskmeta'
+    mongodb_periodictaskmeta_collection = 'celery_periodictaskmeta'
+
+    def __init__(self, *args, **kwargs):
+        """Initialize MongoDB backend instance.
+
+        Raises :class: `django.core.exceptions.ImproperlyConfigured` if
+        module pymongo is not available.
+        """
+
+        if not pymongo:
+            raise ImproperlyConfigured(
+                "You need to install the pymongo library to use the "
+                "MongoDB backend.")
+
+        conf = getattr(settings, "CELERY_MONGODB_BACKEND_SETTINGS", None)
+        if conf is not None:
+            if not isinstance(conf, dict):
+                raise ImproperlyConfigured(
+                    "MongoDB backend settings should be grouped in a dict")
+
+            self.mongodb_host = conf.get('host', self.mongodb_host)
+            self.mongodb_port = int(conf.get('port', self.mongodb_port))
+            self.mongodb_user = conf.get('user', self.mongodb_user)
+            self.mongodb_password = conf.get('password', self.mongodb_password)
+            self.mongodb_database = conf.get('database', self.mongodb_database)
+            self.mongodb_taskmeta_collection = conf.get(
+                'taskmeta_collection', self.mongodb_taskmeta_collection)
+            self.mongodb_collection_periodictaskmeta = conf.get(
+                'periodictaskmeta_collection',
+                self.mongodb_periodictaskmeta_collection)
+
+        super(Backend, self).__init__(*args, **kwargs)
+        self._cache = {}
+        self._connection = None
+        self._database = None
+
+    def _get_connection(self):
+        """Connect to the MongoDB server."""
+        if self._connection is None:
+            from pymongo.connection import Connection
+            self._connection = Connection(self.mongodb_host, self.mongodb_port)
+
+        return self._connection
+
+    def _get_database(self):
+        """"Get database from MongoDB connection and perform authentication
+        if necessary."""
+        if self._database is None:
+            conn = self._get_connection()
+            db = conn[self.mongodb_database]
+            if self.mongodb_user and self.mongodb_password:
+                auth = db.authenticate(self.mongodb_user,
+                                       self.mongodb_password)
+                if not auth:
+                    raise ImproperlyConfigured(
+                        "Invalid MongoDB username or password.")
+            self._database = db
+
+        return self._database
+
+    def process_cleanup(self):
+        if self._connection is not None:
+            # MongoDB connectiom will be closed automatically when object
+            # goes out of scope
+            self._connection = None
+
+    def init_periodic_tasks(self):
+        """Create collection for periodic tasks in database."""
+        db = self._get_database()
+        collection = db[self.mongodb_periodictaskmeta_collection]
+        collection.ensure_index("name", pymongo.ASCENDING, unique=True)
+
+        periodic_tasks = tasks.get_all_periodic()
+        for task_name in periodic_tasks.keys():
+            if not collection.find_one({"name": task_name}):
+                collection.save({"name": task_name,
+                                 "last_run_at": datetime.fromtimestamp(0),
+                                 "total_run_count": 0}, safe=True)
+
+    def run_periodic_tasks(self):
+        """Run all waiting periodic tasks.
+
+        :returns: a list of ``(task, task_id)`` tuples containing
+            the task class and id for the resulting tasks applied.
+        """
+        db = self._get_database()
+        collection = db[self.mongodb_periodictaskmeta_collection]
+
+        waiting_tasks = self._get_waiting_tasks()
+        task_id_tuples = []
+        for waiting_task in waiting_tasks:
+            task = tasks[waiting_task['name']]
+            resp = task.delay()
+            collection.update({'_id': waiting_task['_id']},
+                              {"$inc": {"total_run_count": 1}})
+
+            task_meta = Bunch(name=waiting_task['name'],
+                              last_run_at=waiting_task['last_run_at'],
+                              total_run_count=waiting_task['total_run_count'])
+            task_id_tuples.append((task_meta, resp.task_id))
+
+        return task_id_tuples
+
+    def _is_time(self, last_run_at, run_every):
+        """Check if if it is time to run the periodic task.
+
+        :param last_run_at: Last time the periodic task was run.
+        :param run_every: How often to run the periodic task.
+
+        :rtype bool:
+
+        """
+        # code taken from celery.managers.PeriodicTaskManager
+        run_every_drifted = run_every + SERVER_DRIFT
+        run_at = last_run_at + run_every_drifted
+        if datetime.now() > run_at:
+            return True
+        return False
+
+    def _get_waiting_tasks(self):
+        """Get all waiting periodic tasks."""
+        db = self._get_database()
+        collection = db[self.mongodb_periodictaskmeta_collection]
+
+        periodic_tasks = tasks.get_all_periodic()
+
+        # find all periodic tasks to be run
+        waiting = []
+        for task_meta in collection.find():
+            if task_meta['name'] in periodic_tasks:
+                task = periodic_tasks[task_meta['name']]
+                run_every = task.run_every
+                if self._is_time(task_meta['last_run_at'], run_every):
+                    collection.update(
+                        {"name": task_meta['name'],
+                         "last_run_at": task_meta['last_run_at']},
+                        {"$set": {"last_run_at": datetime.utcnow()}})
+
+                    if db.last_status()['updatedExisting']:
+                        waiting.append(task_meta)
+
+        return waiting
+
+
+    def store_result(self, task_id, result, status, traceback=None):
+        """Store return value and status of an executed task."""
+        from pymongo.binary import Binary
+
+        if status == 'DONE':
+            result = self.prepare_result(result)
+        elif status == 'FAILURE':
+            result = self.prepare_exception(result)
+
+        meta = {"_id": task_id,
+                "status": status,
+                "result": Binary(pickle.dumps(result)),
+                "date_done": datetime.utcnow(),
+                "traceback": Binary(pickle.dumps(traceback))}
+
+        db = self._get_database()
+        taskmeta_collection = db[self.mongodb_taskmeta_collection]
+
+        taskmeta_collection.save(meta, safe=True)
+
+    def is_done(self, task_id):
+        """Returns ``True`` if the task executed successfully."""
+        return self.get_status(task_id) == "DONE"
+
+    def get_status(self, task_id):
+        """Get status of a task."""
+        return self._get_task_meta_for(task_id)["status"]
+
+    def get_traceback(self, task_id):
+        """Get the traceback of a failed task."""
+        meta = self._get_task_meta_for(task_id)
+        return meta["traceback"]
+
+    def get_result(self, task_id):
+        """Get the result for a task."""
+        meta = self._get_task_meta_for(task_id)
+        if meta["status"] == "FAILURE":
+            return self.exception_to_python(meta["result"])
+        else:
+            return meta["result"]
+
+    def _get_task_meta_for(self, task_id):
+        """Get task metadata for a task by id."""
+        if task_id in self._cache:
+            return self._cache[task_id]
+
+        db = self._get_database()
+        taskmeta_collection = db[self.mongodb_taskmeta_collection]
+        obj = taskmeta_collection.find_one({"_id": task_id})
+        if not obj:
+            return {"status": "PENDING", "result": None}
+
+        meta = {
+            "task_id": obj["_id"],
+            "status": obj["status"],
+            "result": pickle.loads(str(obj["result"])),
+            "date_done": obj["date_done"],
+            "traceback": pickle.loads(str(obj["traceback"])),
+        }
+        if meta["status"] == "DONE":
+            self._cache[task_id] = meta
+
+        return meta
+
+    def cleanup(self):
+        """Delete expired metadata."""
+        db = self._get_database()
+        taskmeta_collection = db[self.mongodb_taskmeta_collection]
+        taskmeta_collection.remove({
+                "date_done": {
+                    "$lt": datetime.now() - TASK_RESULT_EXPIRES
+                 }
+        })