Pārlūkot izejas kodu

Allow specifying the dburi for individual DatabaseBackend instances.

To enable usage such as:

@task(backend=DatabaseBackend(dburi="sqlite://other.db"))
def mytask():
    return "this result is stored and fetched from other.db"
Ask Solem 15 gadi atpakaļ
vecāks
revīzija
7261fa8f26
1 mainītis faili ar 12 papildinājumiem un 7 dzēšanām
  1. 12 7
      celery/backends/database.py

+ 12 - 7
celery/backends/database.py

@@ -8,14 +8,19 @@ from celery.db.session import ResultSession
 from celery.backends.base import BaseDictBackend
 from celery.backends.base import BaseDictBackend
 
 
 
 
-
-
 class DatabaseBackend(BaseDictBackend):
 class DatabaseBackend(BaseDictBackend):
     """The database result backend."""
     """The database result backend."""
 
 
+    def __init__(self, dburi=conf.RESULT_DBURI, **kwargs):
+        self.dburi = dburi
+        super(DatabaseBackend, self).__init__(**kwargs)
+
+    def ResultSession(self):
+        return ResultSession(dburi=self.dburi)
+
     def _store_result(self, task_id, result, status, traceback=None):
     def _store_result(self, task_id, result, status, traceback=None):
         """Store return value and status of an executed task."""
         """Store return value and status of an executed task."""
-        session = ResultSession()
+        session = self.ResultSession()
         try:
         try:
             tasks = session.query(Task).filter(Task.task_id == task_id).all()
             tasks = session.query(Task).filter(Task.task_id == task_id).all()
             if not tasks:
             if not tasks:
@@ -35,7 +40,7 @@ class DatabaseBackend(BaseDictBackend):
     def _save_taskset(self, taskset_id, result):
     def _save_taskset(self, taskset_id, result):
         """Store the result of an executed taskset."""
         """Store the result of an executed taskset."""
         taskset = TaskSet(taskset_id, result)
         taskset = TaskSet(taskset_id, result)
-        session = ResultSession()
+        session = self.ResultSession()
         try:
         try:
             session.add(taskset)
             session.add(taskset)
             session.flush()
             session.flush()
@@ -46,7 +51,7 @@ class DatabaseBackend(BaseDictBackend):
 
 
     def _get_task_meta_for(self, task_id):
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
         """Get task metadata for a task by id."""
-        session = ResultSession()
+        session = self.ResultSession()
         try:
         try:
             task = None
             task = None
             for task in session.query(Task).filter(Task.task_id == task_id):
             for task in session.query(Task).filter(Task.task_id == task_id):
@@ -63,7 +68,7 @@ class DatabaseBackend(BaseDictBackend):
 
 
     def _restore_taskset(self, taskset_id):
     def _restore_taskset(self, taskset_id):
         """Get taskset metadata for a taskset by id."""
         """Get taskset metadata for a taskset by id."""
-        session = ResultSession()
+        session = self.ResultSession()
         try:
         try:
             qs = session.query(TaskSet)
             qs = session.query(TaskSet)
             for taskset in qs.filter(TaskSet.task_id == task_id):
             for taskset in qs.filter(TaskSet.task_id == task_id):
@@ -74,7 +79,7 @@ class DatabaseBackend(BaseDictBackend):
     def cleanup(self):
     def cleanup(self):
         """Delete expired metadata."""
         """Delete expired metadata."""
         expires = conf.TASK_RESULT_EXPIRES
         expires = conf.TASK_RESULT_EXPIRES
-        session = ResultSession()
+        session = self.ResultSession()
         try:
         try:
             for task in session.query(Task).filter(
             for task in session.query(Task).filter(
                     Task.date_done < (datetime.now() - expires)):
                     Task.date_done < (datetime.now() - expires)):