Преглед на файлове

Fix the DB backend race condition when clients access the taskmeta at the same time as the server creates it

Chris Rose преди 14 години
родител
ревизия
c266bc1877
променени са 3 файла, в които са добавени 18 реда и са изтрити 6 реда
  1. 5 2
      celery/backends/base.py
  2. 4 4
      celery/backends/database.py
  3. 9 0
      celery/tests/test_backends/test_database.py

+ 5 - 2
celery/backends/base.py

@@ -161,7 +161,10 @@ class BaseDictBackend(BaseBackend):
 
     def get_status(self, task_id):
         """Get the status of a task."""
-        return self.get_task_meta(task_id)["status"]
+        task_meta = self.get_task_meta(task_id)
+        if task_meta is None:
+            return states.PENDING
+        return task_meta['status']
 
     def get_traceback(self, task_id):
         """Get the traceback for a failed task."""
@@ -180,7 +183,7 @@ class BaseDictBackend(BaseBackend):
             return self._cache[task_id]
 
         meta = self._get_task_meta_for(task_id)
-        if cache and meta.get("status") == states.SUCCESS:
+        if cache and meta and meta.get("status") == states.SUCCESS:
             self._cache[task_id] = meta
         return meta
 

+ 4 - 4
celery/backends/database.py

@@ -58,11 +58,11 @@ class DatabaseBackend(BaseDictBackend):
         session = self.ResultSession()
         try:
             task = session.query(Task).filter(Task.task_id == task_id).first()
-            if not task:
+            if task is None:
+                from celery import states
                 task = Task(task_id)
-                session.add(task)
-                session.flush()
-                session.commit()
+                task.status = states.PENDING
+
             return task.to_dict()
         finally:
             session.close()

+ 9 - 0
celery/tests/test_backends/test_database.py

@@ -32,6 +32,15 @@ class test_DatabaseBackend(unittest.TestCase):
         tb = DatabaseBackend()
         self.assertEqual(tb.get_status("xxx-does-not-exist"), states.PENDING)
 
+    def test_missing_task_meta_is_dict_with_pending(self):
+        tb = DatabaseBackend()
+        self.assertDictContainsSubset({
+            'status' : states.PENDING,
+            'task_id' : "xxx-does-not-exist-at-all",
+            'result' : None,
+            'traceback' : None,
+            }, tb.get_task_meta("xxx-does-not-exist-at-all"))
+
     def test_mark_as_done(self):
         tb = DatabaseBackend()