|
@@ -86,11 +86,10 @@ class Mediator(BackgroundThread):
|
|
|
"""Get tasks from bucket queue and apply the task callback."""
|
|
|
logger = get_default_logger()
|
|
|
try:
|
|
|
- logger.debug("Mediator: Trying to get message from ready_queue")
|
|
|
# This blocks until there's a message in the queue.
|
|
|
task = self.ready_queue.get(timeout=1)
|
|
|
except QueueEmpty:
|
|
|
- logger.debug("Mediator: Bucket queue is empty.")
|
|
|
+ time.sleep(1)
|
|
|
else:
|
|
|
logger.debug("Mediator: Running callback for task: %s[%s]" % (
|
|
|
task.task_name, task.task_id))
|
|
@@ -103,13 +102,23 @@ class ScheduleController(BackgroundThread):
|
|
|
def __init__(self, eta_schedule):
|
|
|
super(ScheduleController, self).__init__()
|
|
|
self._scheduler = iter(eta_schedule)
|
|
|
+ self.iterations = 0
|
|
|
|
|
|
def on_iteration(self):
|
|
|
"""Wake-up scheduler"""
|
|
|
logger = get_default_logger()
|
|
|
- logger.debug("ScheduleController: Scheduler wake-up")
|
|
|
delay = self._scheduler.next()
|
|
|
- logger.debug(
|
|
|
- "ScheduleController: Next wake-up estimated at %s seconds..." % (
|
|
|
- delay))
|
|
|
+ debug_log = True
|
|
|
+ if delay is None:
|
|
|
+ delay = 1
|
|
|
+ if self.iterations == 10:
|
|
|
+ self.iterations = 0
|
|
|
+ else:
|
|
|
+ debug_log = False
|
|
|
+ self.iterations += 1
|
|
|
+ if debug_log:
|
|
|
+ logger.debug("ScheduleController: Scheduler wake-up")
|
|
|
+ logger.debug(
|
|
|
+ "ScheduleController: Next wake-up eta %s seconds..." % (
|
|
|
+ delay))
|
|
|
time.sleep(delay)
|