Ver código fonte

celery.beat: Small refactorings

Ask Solem 14 anos atrás
pai
commit
7c9aab4f0c
2 arquivos alterados com 24 adições e 21 exclusões
  1. 1 1
      Changelog
  2. 23 20
      celery/beat.py

+ 1 - 1
Changelog

@@ -21,7 +21,7 @@ Important notes
     By our previous versioning scheme this stable release should have
     been version 2.2.
 
-    The document describing the our release cycle and versioning scheme
+    The document describing our release cycle and versioning scheme
     can be found at `Wiki: Release Cycle`_.
 
 .. _`semver`: http://semver.org

+ 23 - 20
celery/beat.py

@@ -125,32 +125,35 @@ class Scheduler(UserDict):
         self.cleanup()
         self.setup_schedule()
 
-    def iterentries(self):
-        return self.schedule.itervalues()
+    def maybe_due(self, entry, connection=None):
+        is_due, next_time_to_run = entry.is_due()
+        if is_due:
+            self.logger.debug("Scheduler: Sending due task %s" % entry.name)
+            try:
+                result = self.apply_async(entry, connection=connection)
+            except SchedulingError, exc:
+                self.logger.error("Scheduler: %s" % exc)
+            else:
+                self.logger.debug("%s sent. id->%s" % (entry.name,
+                                                       result.task_id))
+        return next_time_to_run
 
     def tick(self):
         """Run a tick, that is one iteration of the scheduler.
-        Executes all due tasks."""
-        debug = self.logger.debug
-        error = self.logger.error
 
+        Executes all due tasks.
+
+        """
         remaining_times = []
         connection = establish_connection()
         try:
-            for entry in self.iterentries():
-                is_due, next_time_to_run = entry.is_due()
-                if is_due:
-                    debug("Scheduler: Sending due task %s" % entry.name)
-                    try:
-                        result = self.apply_async(entry,
-                                      connection=connection)
-                    except SchedulingError, exc:
-                        error("Scheduler: %s" % exc)
-                    else:
-                        debug("%s sent. id->%s" % (entry.name,
-                                                   result.task_id))
-                if next_time_to_run:
-                    remaining_times.append(next_time_to_run)
+            try:
+                for entry in self.schedule.itervalues():
+                    next_time_to_run = self.maybe_due(entry, connection)
+                    if next_time_to_run:
+                        remaining_times.append(next_time_to_run)
+            except RuntimeError:
+                pass
         finally:
             connection.close()
 
@@ -227,7 +230,7 @@ class ClockService(object):
         self.logger.info("Celerybeat: Starting...")
         self.logger.debug("Celerybeat: "
             "Ticking with max interval->%s" % (
-                    humanize_seconds(self.max_interval)))
+                    humanize_seconds(self.scheduler.max_interval)))
 
         if embedded_process:
             platform.set_process_title("celerybeat")