浏览代码

Integrated the SQLAlchemy database backend by haridsv

Ask Solem 15 年之前
父节点
当前提交
814480f02e
共有 4 个文件被更改,包括 161 次插入0 次删除
  1. 1 0
      celery/backends/__init__.py
  2. 92 0
      celery/backends/database.py
  3. 0 0
      celery/db/__init__.py
  4. 68 0
      celery/db/models.py

+ 1 - 0
celery/backends/__init__.py

@@ -9,6 +9,7 @@ BACKEND_ALIASES = {
     "redis": "celery.backends.pyredis.RedisBackend",
     "mongodb": "celery.backends.mongodb.MongoBackend",
     "tyrant": "celery.backends.tyrant.TyrantBackend",
+    "database": "celery.backends.database.DatabaseBackend",
 }
 
 _backend_cache = {}

+ 92 - 0
celery/backends/database.py

@@ -0,0 +1,92 @@
+import urllib
+from datetime import datetime
+
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+
+from celery import conf
+from celery.db.models import Task, TaskSet
+from celery.backends.base import BaseDictBackend
+
+server = '<sql server host>'
+database = '<your db>'
+userid = '<your user>'
+password = '<your password>'
+port = 1433
+raw_cs = "DRIVER={FreeTDS};SERVER=%s;PORT=%d;DATABASE=%s;UID=%s;PWD=%s;CHARSET=UTF8;TDS_VERSION=8.0;TEXTSIZE=10000" % (server, port, database, userid, password)
+#connection_string = "mssql:///?odbc_connect=%s" % urllib.quote_plus(raw_cs)
+#connection_string = 'sqlite:////mnt/winctmp/celery.db'
+connection_string = 'sqlite:///celery.db'
+engine = create_engine(connection_string, echo=True)
+Session = sessionmaker(bind=engine)
+
+
+class DatabaseBackend(BaseDictBackend):
+    """The database result backend."""
+
+    def _store_result(self, task_id, result, status, traceback=None):
+        """Store return value and status of an executed task."""
+        session = Session()
+        try:
+            tasks = session.query(Task).filter(Task.task_id == task_id).all()
+            if not tasks:
+                raise RuntimeError('Task with task_id: %s not found' % task_id)
+            tasks[0].result = result
+            tasks[0].status = status
+            tasks[0].traceback = traceback
+            session.commit()
+        finally:
+            session.close()
+        return result
+
+    def _save_taskset(self, taskset_id, result):
+        """Store the result of an executed taskset."""
+        taskset = TaskSet(taskset_id, result)
+        session = Session()
+        try:
+            session.add(taskset)
+            session.commit()
+        finally:
+            session.close()
+        return result
+
+    def _get_task_meta_for(self, task_id):
+        """Get task metadata for a task by id."""
+        session = Session()
+        try:
+            task = None
+            for task in session.query(Task).filter(Task.task_id == task_id):
+                break
+            if not task:
+                task = Task(task_id)
+                session.add(task)
+                session.commit()
+            if task:
+                return task.to_dict()
+        finally:
+            session.close()
+
+    def _restore_taskset(self, taskset_id):
+        """Get taskset metadata for a taskset by id."""
+        session = Session()
+        try:
+            qs = session.query(TaskSet)
+            for taskset in qs.filter(TaskSet.task_id == task_id):
+                return taskset.to_dict()
+        finally:
+            session.close()
+
+    def cleanup(self):
+        """Delete expired metadata."""
+        expires = conf.TASK_RESULT_EXPIRES
+        session = Session()
+        try:
+            for task in session.query(Task).filter(
+                    Task.date_done < (datetime.now() - expires)):
+                session.delete(task)
+            for taskset in session.query(TaskSet).filter(
+                    TaskSet.date_done < (datetime.now() - expires)):
+                session.delete(taskset)
+            session.commit()
+        finally:
+            session.close()

+ 0 - 0
celery/db/__init__.py


+ 68 - 0
celery/db/models.py

@@ -0,0 +1,68 @@
+from datetime import datetime
+
+from sqlalchemy import Column, Sequence, ForeignKey
+from sqlalchemy import Integer, String, Text, DateTime, PickleType
+from sqlalchemy.orm import relation
+from sqlalchemy.ext.declarative import declarative_base
+
+from celery import states
+
+ModelBase = declarative_base()
+
+
+class Task(ModelBase):
+    """Task result/status."""
+    __tablename__ = "celery_taskmeta"
+
+    id = Column("id", Integer, Sequence("task_id_sequence"), primary_key=True)
+    task_id = Column("task_id", String(255), primary_key=True)
+    status = Column("status", String(50), default=states.PENDING)
+    result = Column("result", PickleType, nullable=True)
+    date_done = Column("date_done", DateTime,
+                       onupdate=datetime.now, nullable=True)
+    traceback = Column("traceback", Text, nullable=True)
+
+    def __init__(self, task_id):
+        self.task_id = task_id
+
+    def __str__(self):
+        return "<Task(%s, %s, %s, %s)>" % (self.task_id,
+                                           self.result,
+                                           self.status,
+                                           self.traceback)
+
+    def to_dict(self):
+        return {"task_id": self.task_id,
+                "status": self.status,
+                "result": self.result,
+                "date_done": self.date_done,
+                "traceback": self.traceback}
+
+    def __unicode__(self):
+        return u"<Task: %s successful: %s>" % (self.task_id, self.status)
+
+
+class TaskSet(ModelBase):
+    """TaskSet result"""
+    __tablename__ = "celery_tasksetmeta"
+
+    id = Column("id", Integer, Sequence("taskset_id_sequence"),
+                primary_key=True)
+    taskset_id = Column("taskset_id", String(255), unique=True)
+    result = Column("result", PickleType, nullable=True)
+    date_done = Column("date_done", DateTime,
+                       onupdate=datetime.now, nullable=True)
+
+    def __init__(self, task_id):
+        self.task_id = task_id
+
+    def __str__(self):
+        return "<TaskSet(%s, %s)>" % (self.task_id, self.result)
+
+    def to_dict(self):
+        return {"taskset_id": self.taskset_id,
+                "result": self.result,
+                "date_done": self.date_done}
+
+    def __unicode__(self):
+        return u"<TaskSet: %s>" % (self.taskset_id)