Browse Source

celerybeat: Don't loop every second, instead find the shortest remaining time and sleep for that long.

PeriodicTask.is_due now returns a tuple: (is_due, remainig_time).
Where is_due is whether the task should be run now, and if
False, remaining_time is the estimated time remaining. If someone wants the
old behaviour and need more often polling, they can just return 1, True here.
Ask Solem 15 years ago
parent
commit
84d4520baf
2 changed files with 37 additions and 14 deletions
  1. 29 13
      celery/beat.py
  2. 8 1
      celery/task/base.py

+ 29 - 13
celery/beat.py

@@ -73,16 +73,20 @@ class Scheduler(UserDict):
     def tick(self):
     def tick(self):
         """Run a tick, that is one iteration of the scheduler.
         """Run a tick, that is one iteration of the scheduler.
         Executes all due tasks."""
         Executes all due tasks."""
-        for entry in self.get_due_tasks():
-            self.logger.debug("Scheduler: Sending due task %s" % (
-                    entry.name))
-            result = self.apply_async(entry)
-            self.logger.debug("Scheduler: %s sent. id->%s" % (
-                    entry.name, result.task_id))
+        remaining_times = []
+        for entry in self.schedule.values():
+            is_due, remaining = self.is_due(entry)
+            if is_due:
+                self.logger.debug("Scheduler: Sending due task %s" % (
+                        entry.name))
+                result = self.apply_async(entry)
+                self.logger.debug("Scheduler: %s sent. id->%s" % (
+                        entry.name, result.task_id))
+            else:
+                if remaining:
+                    remaining_times.append(remaining)
 
 
-    def get_due_tasks(self):
-        """Get all the schedule entries that are due to execution."""
-        return filter(self.is_due, self.schedule.values())
+        return min(remaining_times or [self.interval])
 
 
     def get_task(self, name):
     def get_task(self, name):
         try:
         try:
@@ -147,8 +151,8 @@ class ClockService(object):
         scheduler = self.scheduler_cls(schedule=schedule,
         scheduler = self.scheduler_cls(schedule=schedule,
                                        registry=self.registry,
                                        registry=self.registry,
                                        logger=self.logger)
                                        logger=self.logger)
-        self.logger.debug(
-                "ClockService: Ticking with interval->%d, schedule->%s" % (
+        self.logger.debug("ClockService: "
+            "Ticking with default interval->%d, schedule->%s" % (
                     scheduler.interval, self.schedule_filename))
                     scheduler.interval, self.schedule_filename))
 
 
         synced = [False]
         synced = [False]
@@ -160,12 +164,24 @@ class ClockService(object):
                 synced[0] = True
                 synced[0] = True
                 self._stopped.set()
                 self._stopped.set()
 
 
+        times = (("days", 60 * 60 * 24),
+                 ("hours", 60 * 60),
+                 ("minutes", 60))
+
+        def humanize(seconds):
+            for desc, mul in times:
+                if seconds > mul:
+                    return "%s %s" % (seconds / mul, desc)
+            return "%d seconds" % seconds
+
         try:
         try:
             while True:
             while True:
                 if self._shutdown.isSet():
                 if self._shutdown.isSet():
                     break
                     break
-                scheduler.tick()
-                time.sleep(scheduler.interval)
+                interval = scheduler.tick()
+                self.logger.debug("ClockService: Waking up in %s" % (
+                    humanize(interval)))
+                time.sleep(interval)
         except (KeyboardInterrupt, SystemExit):
         except (KeyboardInterrupt, SystemExit):
             _stop()
             _stop()
         finally:
         finally:

+ 8 - 1
celery/task/base.py

@@ -585,9 +585,16 @@ class PeriodicTask(Task):
 
 
         super(PeriodicTask, self).__init__()
         super(PeriodicTask, self).__init__()
 
 
+    def remaining_estimate(self, last_run_at):
+        rem = (last_run_at + self.run_every) - datetime.now()
+        if not rem.days:
+            return 0
+        return rem.seconds + (rem.microseconds / 10e5)
+
     def is_due(self, last_run_at):
     def is_due(self, last_run_at):
         """Returns ``True`` if the task is due.
         """Returns ``True`` if the task is due.
 
 
         You can override this to decide the interval at runtime.
         You can override this to decide the interval at runtime.
         """
         """
-        return datetime.now() > (last_run_at + self.run_every)
+        remaining = self.remaining_estimate(last_run_at)
+        return not remaining, remaining