|
@@ -6,6 +6,7 @@ Worker Controller Threads
|
|
|
from celery.backends import default_periodic_status_backend
|
|
|
from Queue import Empty as QueueEmpty
|
|
|
from datetime import datetime
|
|
|
+from multiprocessing import get_logger
|
|
|
import threading
|
|
|
import time
|
|
|
|
|
@@ -70,12 +71,17 @@ class Mediator(InfinityThread):
|
|
|
self.callback = callback
|
|
|
|
|
|
def on_iteration(self):
|
|
|
+ logger = get_logger()
|
|
|
try:
|
|
|
+ logger.debug("Mediator: Trying to get message from bucket_queue")
|
|
|
# This blocks until there's a message in the queue.
|
|
|
task = self.bucket_queue.get(timeout=1)
|
|
|
except QueueEmpty:
|
|
|
+ logger.debug("Mediator: Bucket queue is empty.")
|
|
|
pass
|
|
|
else:
|
|
|
+ logger.debug("Mediator: Running callback for task: %s[%s]" % (
|
|
|
+ task.task_name, task.task_id))
|
|
|
self.callback(task)
|
|
|
|
|
|
|
|
@@ -96,8 +102,12 @@ class PeriodicWorkController(InfinityThread):
|
|
|
self.bucket_queue = bucket_queue
|
|
|
|
|
|
def on_iteration(self):
|
|
|
+ logger = get_logger()
|
|
|
+ logger.debug("PeriodicWorkController: Running periodic tasks...")
|
|
|
self.run_periodic_tasks()
|
|
|
+ logger.debug("PeriodicWorkController: Processing hold queue...")
|
|
|
self.process_hold_queue()
|
|
|
+ logger.debug("PeriodicWorkController: Going to sleep...")
|
|
|
time.sleep(1)
|
|
|
|
|
|
def run_periodic_tasks(self):
|
|
@@ -106,11 +116,21 @@ class PeriodicWorkController(InfinityThread):
|
|
|
def process_hold_queue(self):
|
|
|
"""Finds paused tasks that are ready for execution and move
|
|
|
them to the :attr:`bucket_queue`."""
|
|
|
+ logger = get_logger()
|
|
|
try:
|
|
|
+ logger.debug(
|
|
|
+ "PeriodicWorkController: Getting next task from hold queue..")
|
|
|
task, eta = self.hold_queue.get_nowait()
|
|
|
except QueueEmpty:
|
|
|
+ logger.debug("PeriodicWorkController: Hold queue is empty")
|
|
|
return
|
|
|
if datetime.now() >= eta:
|
|
|
+ logger.debug(
|
|
|
+ "PeriodicWorkController: Time to run %s[%s] (%s)..." % (
|
|
|
+ task.task_name, task.task_id, eta))
|
|
|
self.bucket_queue.put(task)
|
|
|
else:
|
|
|
+ logger.debug(
|
|
|
+ "PeriodicWorkController: ETA not ready for %s[%s] (%s)..." % (
|
|
|
+ task.task_name, task.task_id, eta))
|
|
|
self.hold_queue.put((task, eta))
|