|
@@ -174,9 +174,9 @@ class Scheduler(object):
|
|
Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
|
|
Publisher=None, lazy=False, sync_every_tasks=None, **kwargs):
|
|
self.app = app
|
|
self.app = app
|
|
self.data = maybe_evaluate({} if schedule is None else schedule)
|
|
self.data = maybe_evaluate({} if schedule is None else schedule)
|
|
- self.max_interval = (max_interval
|
|
|
|
- or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
|
|
|
|
- or self.max_interval)
|
|
|
|
|
|
+ self.max_interval = (max_interval or
|
|
|
|
+ app.conf.CELERYBEAT_MAX_LOOP_INTERVAL or
|
|
|
|
+ self.max_interval)
|
|
self.sync_every_tasks = (
|
|
self.sync_every_tasks = (
|
|
app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
|
|
app.conf.CELERYBEAT_SYNC_EVERY if sync_every_tasks is None
|
|
else sync_every_tasks)
|
|
else sync_every_tasks)
|
|
@@ -362,22 +362,31 @@ class PersistentScheduler(Scheduler):
|
|
with platforms.ignore_errno(errno.ENOENT):
|
|
with platforms.ignore_errno(errno.ENOENT):
|
|
os.remove(self.schedule_filename + suffix)
|
|
os.remove(self.schedule_filename + suffix)
|
|
|
|
|
|
|
|
+ def _open_schedule(self):
|
|
|
|
+ return self.persistence.open(self.schedule_filename, writeback=True)
|
|
|
|
+
|
|
|
|
+ def _destroy_open_corrupted_schedule(self, exc):
|
|
|
|
+ error('Removing corrupted schedule file %r: %r',
|
|
|
|
+ self.schedule_filename, exc, exc_info=True)
|
|
|
|
+ self._remove_db()
|
|
|
|
+ return self._open_schedule()
|
|
|
|
+
|
|
def setup_schedule(self):
|
|
def setup_schedule(self):
|
|
try:
|
|
try:
|
|
- self._store = self.persistence.open(self.schedule_filename,
|
|
|
|
- writeback=True)
|
|
|
|
|
|
+ self._store = self._open_schedule()
|
|
except Exception as exc:
|
|
except Exception as exc:
|
|
- error('Removing corrupted schedule file %r: %r',
|
|
|
|
- self.schedule_filename, exc, exc_info=True)
|
|
|
|
- self._remove_db()
|
|
|
|
- self._store = self.persistence.open(self.schedule_filename,
|
|
|
|
- writeback=True)
|
|
|
|
- else:
|
|
|
|
|
|
+ self._store = self._destroy_open_corrupted_schedule(exc)
|
|
|
|
+
|
|
|
|
+ for _ in (1, 2):
|
|
try:
|
|
try:
|
|
self._store['entries']
|
|
self._store['entries']
|
|
except KeyError:
|
|
except KeyError:
|
|
# new schedule db
|
|
# new schedule db
|
|
- self._store['entries'] = {}
|
|
|
|
|
|
+ try:
|
|
|
|
+ self._store['entries'] = {}
|
|
|
|
+ except KeyError as exc:
|
|
|
|
+ self._store = self._destroy_open_corrupted_schedule(exc)
|
|
|
|
+ continue
|
|
else:
|
|
else:
|
|
if '__version__' not in self._store:
|
|
if '__version__' not in self._store:
|
|
warning('DB Reset: Account for new __version__ field')
|
|
warning('DB Reset: Account for new __version__ field')
|
|
@@ -388,6 +397,7 @@ class PersistentScheduler(Scheduler):
|
|
elif 'utc_enabled' not in self._store:
|
|
elif 'utc_enabled' not in self._store:
|
|
warning('DB Reset: Account for new utc_enabled field')
|
|
warning('DB Reset: Account for new utc_enabled field')
|
|
self._store.clear() # remove schedule at 3.0.9 upgrade
|
|
self._store.clear() # remove schedule at 3.0.9 upgrade
|
|
|
|
+ break
|
|
|
|
|
|
tz = self.app.conf.CELERY_TIMEZONE
|
|
tz = self.app.conf.CELERY_TIMEZONE
|
|
stored_tz = self._store.get('tz')
|
|
stored_tz = self._store.get('tz')
|
|
@@ -435,8 +445,8 @@ class Service(object):
|
|
def __init__(self, app, max_interval=None, schedule_filename=None,
|
|
def __init__(self, app, max_interval=None, schedule_filename=None,
|
|
scheduler_cls=None):
|
|
scheduler_cls=None):
|
|
self.app = app
|
|
self.app = app
|
|
- self.max_interval = (max_interval
|
|
|
|
- or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
|
|
|
|
|
|
+ self.max_interval = (max_interval or
|
|
|
|
+ app.conf.CELERYBEAT_MAX_LOOP_INTERVAL)
|
|
self.scheduler_cls = scheduler_cls or self.scheduler_cls
|
|
self.scheduler_cls = scheduler_cls or self.scheduler_cls
|
|
self.schedule_filename = (
|
|
self.schedule_filename = (
|
|
schedule_filename or app.conf.CELERYBEAT_SCHEDULE_FILENAME)
|
|
schedule_filename or app.conf.CELERYBEAT_SCHEDULE_FILENAME)
|