|
@@ -11,6 +11,10 @@ schedule = shelve.open(filename="celerybeat-schedule")
|
|
|
atexit.register(schedule.close)
|
|
|
|
|
|
|
|
|
+class SchedulingError(Exception):
|
|
|
+ """An error occured while scheduling task."""
|
|
|
+
|
|
|
+
|
|
|
class ClockService(object):
|
|
|
scheduler_cls = Scheduler
|
|
|
schedule = schedule
|
|
@@ -47,10 +51,10 @@ class ClockServiceThread(threading.Thread):
|
|
|
self.setDaemon(True)
|
|
|
|
|
|
def run(self):
|
|
|
- return self.clockservice.start()
|
|
|
+ self.clockservice.start()
|
|
|
|
|
|
def stop(self):
|
|
|
- return self.clockservice.stop(wait=True)
|
|
|
+ self.clockservice.stop(wait=True)
|
|
|
|
|
|
|
|
|
class ScheduleEntry(object):
|
|
@@ -69,14 +73,17 @@ class ScheduleEntry(object):
|
|
|
self.total_run_count = None
|
|
|
|
|
|
def execute(self):
|
|
|
+ # Increment timestamps and counts before executing,
|
|
|
+ # in case of exception.
|
|
|
+ self.last_run_at = datetime.now()
|
|
|
+ self.total_run_count += 1
|
|
|
+
|
|
|
try:
|
|
|
result = self.task.apply_async()
|
|
|
except Exception, exc:
|
|
|
- print("Couldn't apply scheduled task %s: %s" % (
|
|
|
- self.task.name, exc))
|
|
|
- result = None
|
|
|
- self.last_run_at = datetime.now()
|
|
|
- self.total_run_count += 1
|
|
|
+ raise SchedulingError(
|
|
|
+ "Couldn't apply scheduled task %s: %s" % (
|
|
|
+ self.task.name, exc))
|
|
|
return result
|
|
|
|
|
|
def is_due(self):
|
|
@@ -94,22 +101,23 @@ class Scheduler(UserDict):
|
|
|
"""
|
|
|
interval = 1
|
|
|
|
|
|
- def __init__(self, registry=None, schedule=None, interval=None):
|
|
|
+ def __init__(self, registry=None, schedule=None, interval=None,
|
|
|
+ logger=None):
|
|
|
self.registry = registry or {}
|
|
|
self.data = schedule or {}
|
|
|
if interval is not None:
|
|
|
self.interval = interval
|
|
|
self.schedule_registry()
|
|
|
|
|
|
- def stop(self):
|
|
|
- self.schedule.close()
|
|
|
-
|
|
|
def tick(self):
|
|
|
"""Run a tick, that is one iteration of the scheduler.
|
|
|
Executes all due tasks."""
|
|
|
return [(entry.task, entry.execute())
|
|
|
for entry in self.get_due_tasks()]
|
|
|
|
|
|
+ def stop(self):
|
|
|
+ self.schedule.close()
|
|
|
+
|
|
|
def get_due_tasks(self):
|
|
|
"""Get all the schedule entries that are due to execution."""
|
|
|
return filter(lambda entry: entry.is_due(), self.schedule.values())
|