|
@@ -401,22 +401,31 @@ class PersistentScheduler(Scheduler):
|
|
|
with platforms.ignore_errno(errno.ENOENT):
|
|
|
os.remove(self.schedule_filename + suffix)
|
|
|
|
|
|
+ def _open_schedule(self):
|
|
|
+ return self.persistence.open(self.schedule_filename, writeback=True)
|
|
|
+
|
|
|
+ def _destroy_open_corrupted_schedule(self, exc):
|
|
|
+ error('Removing corrupted schedule file %r: %r',
|
|
|
+ self.schedule_filename, exc, exc_info=True)
|
|
|
+ self._remove_db()
|
|
|
+ return self._open_schedule()
|
|
|
+
|
|
|
def setup_schedule(self):
|
|
|
try:
|
|
|
- self._store = self.persistence.open(self.schedule_filename,
|
|
|
- writeback=True)
|
|
|
+ self._store = self._open_schedule()
|
|
|
except Exception as exc:
|
|
|
- error('Removing corrupted schedule file %r: %r',
|
|
|
- self.schedule_filename, exc, exc_info=True)
|
|
|
- self._remove_db()
|
|
|
- self._store = self.persistence.open(self.schedule_filename,
|
|
|
- writeback=True)
|
|
|
- else:
|
|
|
+ self._store = self._destroy_open_corrupted_schedule(exc)
|
|
|
+
|
|
|
+ for _ in (1, 2):
|
|
|
try:
|
|
|
self._store['entries']
|
|
|
except KeyError:
|
|
|
# new schedule db
|
|
|
- self._store['entries'] = {}
|
|
|
+ try:
|
|
|
+ self._store['entries'] = {}
|
|
|
+ except KeyError as exc:
|
|
|
+ self._store = self._destroy_open_corrupted_schedule(exc)
|
|
|
+ continue
|
|
|
else:
|
|
|
if '__version__' not in self._store:
|
|
|
warning('DB Reset: Account for new __version__ field')
|
|
@@ -427,6 +436,7 @@ class PersistentScheduler(Scheduler):
|
|
|
elif 'utc_enabled' not in self._store:
|
|
|
warning('DB Reset: Account for new utc_enabled field')
|
|
|
self._store.clear() # remove schedule at 3.0.9 upgrade
|
|
|
+ break
|
|
|
|
|
|
tz = self.app.conf.CELERY_TIMEZONE
|
|
|
stored_tz = self._store.get('tz')
|