|
@@ -1,4 +1,5 @@
|
|
|
import time
|
|
|
+import math
|
|
|
import shelve
|
|
|
import atexit
|
|
|
import threading
|
|
@@ -10,6 +11,20 @@ from celery import registry
|
|
|
from celery.log import setup_logger
|
|
|
from celery.exceptions import NotRegistered
|
|
|
|
|
|
+TIME_UNITS = (("day", 60 * 60 * 24, lambda n: int(math.ceil(n))),
|
|
|
+ ("hour", 60 * 60, lambda n: int(math.ceil(n))),
|
|
|
+ ("minute", 60, lambda n: int(math.ceil(n))),
|
|
|
+ ("second", 1, lambda n: "%.2d" % n))
|
|
|
+
|
|
|
+
|
|
|
+def humanize_seconds(secs, prefix=""):
|
|
|
+ for unit, divider, formatter in TIME_UNITS:
|
|
|
+ if secs >= divider:
|
|
|
+ w = secs / divider
|
|
|
+ punit = w > 1 and unit+"s" or unit
|
|
|
+ return "%s%s %s" % (prefix, formatter(w), punit)
|
|
|
+ return "now"
|
|
|
+
|
|
|
|
|
|
class SchedulingError(Exception):
|
|
|
"""An error occured while scheduling task."""
|
|
@@ -75,17 +90,17 @@ class Scheduler(UserDict):
|
|
|
Executes all due tasks."""
|
|
|
remaining_times = []
|
|
|
for entry in self.schedule.values():
|
|
|
- is_due, remaining = self.is_due(entry)
|
|
|
+ is_due, next_time_to_run = self.is_due(entry)
|
|
|
if is_due:
|
|
|
self.logger.debug("Scheduler: Sending due task %s" % (
|
|
|
entry.name))
|
|
|
result = self.apply_async(entry)
|
|
|
self.logger.debug("Scheduler: %s sent. id->%s" % (
|
|
|
entry.name, result.task_id))
|
|
|
- if remaining:
|
|
|
- remaining_times.append(remaining)
|
|
|
+ if next_time_to_run:
|
|
|
+ remaining_times.append(next_time_to_run)
|
|
|
|
|
|
- return min(remaining_times or [self.interval])
|
|
|
+ return min(remaining_times + [conf.CELERYBEAT_MAX_LOOP_INTERVAL])
|
|
|
|
|
|
def get_task(self, name):
|
|
|
try:
|
|
@@ -151,8 +166,9 @@ class ClockService(object):
|
|
|
registry=self.registry,
|
|
|
logger=self.logger)
|
|
|
self.logger.debug("ClockService: "
|
|
|
- "Ticking with default interval->%d, schedule->%s" % (
|
|
|
- scheduler.interval, self.schedule_filename))
|
|
|
+ "Ticking with max interval->%s, schedule->%s" % (
|
|
|
+ humanize_seconds(conf.CELERYBEAT_MAX_LOOP_INTERVAL),
|
|
|
+ self.schedule_filename))
|
|
|
|
|
|
synced = [False]
|
|
|
def _stop():
|
|
@@ -163,23 +179,14 @@ class ClockService(object):
|
|
|
synced[0] = True
|
|
|
self._stopped.set()
|
|
|
|
|
|
- times = (("days", 60 * 60 * 24),
|
|
|
- ("hours", 60 * 60),
|
|
|
- ("minutes", 60))
|
|
|
-
|
|
|
- def humanize(seconds):
|
|
|
- for desc, mul in times:
|
|
|
- if seconds > mul:
|
|
|
- return "%s %s" % (seconds / mul, desc)
|
|
|
- return "%d seconds" % seconds
|
|
|
|
|
|
try:
|
|
|
while True:
|
|
|
if self._shutdown.isSet():
|
|
|
break
|
|
|
interval = scheduler.tick()
|
|
|
- self.logger.debug("ClockService: Waking up in %s" % (
|
|
|
- humanize(interval)))
|
|
|
+ self.logger.debug("ClockService: Waking up %s." % (
|
|
|
+ humanize_seconds(interval, prefix="in ")))
|
|
|
time.sleep(interval)
|
|
|
except (KeyboardInterrupt, SystemExit):
|
|
|
_stop()
|