|
@@ -130,6 +130,11 @@ class Scheduler(object):
|
|
|
#: Maximum time to sleep between re-checking the schedule.
|
|
|
max_interval = 1
|
|
|
|
|
|
+ #: How often to sync the schedule (3 minutes by default)
|
|
|
+ sync_every = 3 * 60
|
|
|
+
|
|
|
+ _last_sync = None
|
|
|
+
|
|
|
def __init__(self, schedule=None, logger=None, max_interval=None,
|
|
|
app=None, Publisher=None, lazy=False, **kwargs):
|
|
|
app = self.app = app_or_default(app)
|
|
@@ -183,6 +188,10 @@ class Scheduler(object):
|
|
|
|
|
|
return min(remaining_times + [self.max_interval])
|
|
|
|
|
|
+ def should_sync(self):
|
|
|
+ return (not self._last_sync or
|
|
|
+ (time.time() - self._last_sync) > self.sync_every)
|
|
|
+
|
|
|
def reserve(self, entry):
|
|
|
new_entry = self.schedule[entry.name] = entry.next()
|
|
|
return new_entry
|
|
@@ -206,6 +215,9 @@ class Scheduler(object):
|
|
|
except Exception, exc:
|
|
|
raise SchedulingError("Couldn't apply scheduled task %s: %s" % (
|
|
|
entry.name, exc))
|
|
|
+
|
|
|
+ if self.should_sync():
|
|
|
+ self._do_sync()
|
|
|
return result
|
|
|
|
|
|
def send_task(self, *args, **kwargs): # pragma: no cover
|
|
@@ -214,6 +226,14 @@ class Scheduler(object):
|
|
|
def setup_schedule(self):
|
|
|
self.install_default_entries(self.data)
|
|
|
|
|
|
+ def _do_sync(self):
|
|
|
+ try:
|
|
|
+ self.logger.debug("Celerybeat: Synchronizing schedule...")
|
|
|
+ self.sync()
|
|
|
+ finally:
|
|
|
+ self._last_sync = time.time()
|
|
|
+
|
|
|
+
|
|
|
def sync(self):
|
|
|
pass
|
|
|
|
|
@@ -318,7 +338,6 @@ class PersistentScheduler(Scheduler):
|
|
|
|
|
|
def sync(self):
|
|
|
if self._store is not None:
|
|
|
- self.logger.debug("CeleryBeat: Syncing schedule to disk...")
|
|
|
self._store.sync()
|
|
|
|
|
|
def close(self):
|