Explorar el Código

Statedb was not saved at shutdown.

Ask Solem hace 14 años
padre
commit
16f1065a89

+ 1 - 2
celery/tests/test_worker/test_worker.py

@@ -849,8 +849,7 @@ class test_WorkController(AppCase):
         state.Persistent = Mock()
         try:
             worker = self.create_worker(db="statefilename")
-            self.assertTrue(worker._finalize_db)
-            worker._finalize_db.cancel()
+            self.assertTrue(worker._persistence)
         finally:
             state.Persistent = Persistent
 

+ 1 - 1
celery/tests/test_worker/test_worker_state.py

@@ -31,7 +31,7 @@ class MockShelve(dict):
     in_sync = False
     closed = False
 
-    def open(self, filename):
+    def open(self, filename, **kwargs):
         self.filename = filename
         return self
 

+ 4 - 4
celery/worker/__init__.py

@@ -1,5 +1,6 @@
-import socket
+import atexit
 import logging
+import socket
 import traceback
 
 from kombu.syn import blocking
@@ -155,9 +156,8 @@ class WorkController(object):
         self._finalize_db = None
 
         if self.db:
-            persistence = state.Persistent(self.db)
-            self._finalize_db = Finalize(persistence, persistence.save,
-                                         exitpriority=5)
+            self._persistence = state.Persistent(self.db)
+            atexit.register(self._persistence.save)
 
         # Queues
         if self.disable_rate_limits:

+ 3 - 3
celery/worker/state.py

@@ -87,7 +87,8 @@ class Persistent(object):
         self._load()
 
     def save(self):
-        self.sync(self.db).sync()
+        self.sync(self.db)
+        self.db.sync()
         self.close()
 
     def merge(self, d):
@@ -101,7 +102,7 @@ class Persistent(object):
         return d
 
     def open(self):
-        return self.storage.open(self.filename)
+        return self.storage.open(self.filename, writeback=True)
 
     def close(self):
         if self._is_open:
@@ -110,7 +111,6 @@ class Persistent(object):
 
     def _load(self):
         self.merge(self.db)
-        self.close()
 
     @cached_property
     def db(self):