Browse Source

Now using a scheduler to schedule tasks with ETA/countdown. A lot more efficient.

Ask Solem 15 years ago
parent
commit
4c57b60a34
1 changed files with 13 additions and 41 deletions
  1. 13 41
      celery/worker/controllers.py

+ 13 - 41
celery/worker/controllers.py

@@ -96,48 +96,20 @@ class Mediator(BackgroundThread):
             self.callback(task)
 
 
-class PeriodicWorkController(BackgroundThread):
-    """Finds tasks in the hold queue that is
-    ready for execution and moves them to the bucket queue.
+class ScheduleController(BackgroundThread):
+    """Schedules tasks with an ETA by moving them to the bucket queue."""
 
-    (Tasks in the hold queue are tasks waiting for retry, or with an
-    ``eta``/``countdown``.)
-
-    """
-
-    def __init__(self, bucket_queue, hold_queue):
-        super(PeriodicWorkController, self).__init__()
-        self.hold_queue = hold_queue
-        self.bucket_queue = bucket_queue
+    def __init__(self, eta_schedule):
+        super(ScheduleController, self).__init__()
+        self._scheduler = iter(eta_schedule)
 
     def on_iteration(self):
-        """Process the hold queue."""
-        logger = get_default_logger()
-        logger.debug("PeriodicWorkController: Processing hold queue...")
-        self.process_hold_queue()
-        logger.debug("PeriodicWorkController: Going to sleep...")
-        time.sleep(1)
-
-    def process_hold_queue(self):
-        """Finds paused tasks that are ready for execution and move
-        them to the :attr:`bucket_queue`."""
+        """Wake-up scheduler"""
         logger = get_default_logger()
-        try:
-            logger.debug(
-                "PeriodicWorkController: Getting next task from hold queue..")
-            task, eta, on_accept = self.hold_queue.get_nowait()
-        except QueueEmpty:
-            logger.debug("PeriodicWorkController: Hold queue is empty")
-            return
-
-        if datetime.now() >= eta:
-            logger.debug(
-                "PeriodicWorkController: Time to run %s[%s] (%s)..." % (
-                    task.task_name, task.task_id, eta))
-            on_accept() # Run the accept task callback.
-            self.bucket_queue.put(task)
-        else:
-            logger.debug(
-                "PeriodicWorkController: ETA not ready for %s[%s] (%s)..." % (
-                    task.task_name, task.task_id, eta))
-            self.hold_queue.put((task, eta, on_accept))
+        logger.debug("ScheduleController: Scheduler wake-up")
+        delay = self._scheduler.next()
+        logger.debug(
+            "ScheduleController: Next wake-up estimated at %s seconds..." % (
+                delay))
+        time.sleep(delay)
+