Browse Source

timer2: Fixes bad typo affecting precision, and also Schedule now has the same interface as the Timer thread. Closes #507

Ask Solem 13 years ago
parent
commit
3bfb2a197a
1 changed files with 59 additions and 34 deletions
  1. 59 34
      celery/utils/timer2.py

+ 59 - 34
celery/utils/timer2.py

@@ -79,18 +79,30 @@ def to_timestamp(d):
 
 class Schedule(object):
     """ETA scheduler."""
+    Entry = Entry
+
     on_error = None
 
-    def __init__(self, max_interval=DEFAULT_MAX_INTERVAL, on_error=None):
-        self.max_interval = float(max_interval)
+    def __init__(self, max_interval=None, on_error=None):
+        self.max_interval = float(max_interval or DEFAULT_MAX_INTERVAL)
         self.on_error = on_error or self.on_error
         self._queue = []
 
+    def apply_entry(self, entry):
+        try:
+            entry()
+        except Exception, exc:
+            if not self.handle_error(exc):
+                logger.error("Error in timer: %r", exc, exc_info=True)
+
     def handle_error(self, exc_info):
         if self.on_error:
             self.on_error(exc_info)
             return True
 
+    def stop(self):
+        pass
+
     def enter(self, entry, eta=None, priority=0):
         """Enter function into the scheduler.
 
@@ -113,6 +125,29 @@ class Schedule(object):
         heapq.heappush(self._queue, (eta, priority, entry))
         return entry
 
+    def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
+        return self.enter(self.Entry(fun, args, kwargs), eta, priority)
+
+    def enter_after(self, msecs, entry, priority=0):
+        eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
+        return self.enter(entry, eta, priority)
+
+    def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
+        return self.enter_after(msecs, self.Entry(fun, args, kwargs), priority)
+
+    def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
+        tref = self.Entry(fun, args, kwargs)
+
+        def _reschedules(*args, **kwargs):
+            try:
+                return fun(*args, **kwargs)
+            finally:
+                if not tref.cancelled:
+                    self.enter_after(msecs, tref, priority)
+
+        tref.fun = _reschedules
+        return self.enter_after(msecs, tref, priority)
+
     def __iter__(self):
         """The iterator yields the time to sleep for between runs."""
 
@@ -138,7 +173,8 @@ class Schedule(object):
                         continue
                     else:
                         heapq.heappush(queue, event)
-            yield None, None
+            else:
+                yield None, None
 
     def empty(self):
         """Is the schedule empty?"""
@@ -152,6 +188,9 @@ class Schedule(object):
         return ({"eta": eta, "priority": priority, "item": item}
                     for eta, priority, item in self.queue)
 
+    def cancel(self, tref):
+        tref.cancel()
+
     @property
     def queue(self):
         events = list(self._queue)
@@ -159,15 +198,16 @@ class Schedule(object):
 
 
 class Timer(Thread):
-    Entry = Entry
     Schedule = Schedule
 
     running = False
     on_tick = None
     _timer_count = count(1).next
 
-    def __init__(self, schedule=None, on_error=None, on_tick=None, **kwargs):
-        self.schedule = schedule or self.Schedule(on_error=on_error)
+    def __init__(self, schedule=None, on_error=None, on_tick=None,
+            max_interval=None, **kwargs):
+        self.schedule = schedule or self.Schedule(on_error=on_error,
+                                                  max_interval=max_interval)
         self.on_tick = on_tick or self.on_tick
 
         Thread.__init__(self)
@@ -178,13 +218,6 @@ class Timer(Thread):
         self.setDaemon(True)
         self.setName("Timer-%s" % (self._timer_count(), ))
 
-    def apply_entry(self, entry):
-        try:
-            entry()
-        except Exception, exc:
-            if not self.schedule.handle_error(exc):
-                logger.error("Error in timer: %r", exc, exc_info=True)
-
     def _next_entry(self):
         with self.not_empty:
             delay, entry = self.scheduler.next()
@@ -192,7 +225,7 @@ class Timer(Thread):
                 if delay is None:
                     self.not_empty.wait(1.0)
                 return delay
-        return self.apply_entry(entry)
+        return self.schedule.apply_entry(entry)
     __next__ = next = _next_entry  # for 2to3
 
     def run(self):
@@ -229,35 +262,27 @@ class Timer(Thread):
         if not self.running and not self.isAlive():
             self.start()
 
-    def enter(self, entry, eta, priority=None):
+    def _do_enter(self, meth, *args, **kwargs):
         self.ensure_started()
         with self.mutex:
-            entry = self.schedule.enter(entry, eta, priority)
+            entry = getattr(self.schedule, meth)(*args, **kwargs)
             self.not_empty.notify()
             return entry
 
-    def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
-        return self.enter(self.Entry(fun, args, kwargs), eta, priority)
-
-    def enter_after(self, msecs, entry, priority=0):
-        eta = datetime.now() + timedelta(seconds=msecs / 1000.0)
-        return self.enter(entry, eta, priority)
+    def enter(self, entry, eta, priority=None):
+        return self._do_enter("enter", entry, eta, priority=priority)
 
-    def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
-        return self.enter_after(msecs, Entry(fun, args, kwargs), priority)
+    def apply_at(self, *args, **kwargs):
+        return self._do_enter("apply_at", *args, **kwargs)
 
-    def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
-        tref = Entry(fun, args, kwargs)
+    def enter_after(self, *args, **kwargs):
+        return self._do_enter("enter_after", *args, **kwargs)
 
-        def _reschedules(*args, **kwargs):
-            try:
-                return fun(*args, **kwargs)
-            finally:
-                if not tref.cancelled:
-                    self.enter_after(msecs, tref, priority)
+    def apply_after(self, *args, **kwargs):
+        return self._do_enter("apply_after", *args, **kwargs)
 
-        tref.fun = _reschedules
-        return self.enter_after(msecs, tref, priority)
+    def apply_interval(self, *args, **kwargs):
+        return self._do_enter("apply_interval", *args, **kwargs)
 
     def exit_after(self, msecs, priority=10):
         self.apply_after(msecs, sys.exit, priority)