Przeglądaj źródła

Merged timer2 fixes from gossip branch

Ask Solem 14 lat temu
rodzic
commit
b4c13a02b9
1 zmienionych plików z 20 dodań i 15 usunięć
  1. 20 15
      celery/utils/timer2.py

+ 20 - 15
celery/utils/timer2.py

@@ -98,24 +98,17 @@ class Schedule(object):
                 now = nowfun()
 
                 if now < eta:
-                    yield min(eta - now, self.max_interval)
+                    yield min(eta - now, self.max_interval), None
                 else:
                     event = pop(self._queue)
 
                     if event is verify:
                         if not entry.cancelled:
-                            try:
-                                entry()
-                            except Exception, exc:
-                                typ, val, tb = einfo = sys.exc_info()
-                                if not self.handle_error(einfo):
-                                    warnings.warn(repr(exc),
-                                                  TimedFunctionFailed)
-                                    traceback.print_exception(typ, val, tb)
+                            yield None, entry
                         continue
                     else:
                         heapq.heappush(self._queue, event)
-            yield None
+            yield None, None
 
     def empty(self):
         """Is the schedule empty?"""
@@ -153,15 +146,26 @@ class Timer(Thread):
         self.setDaemon(True)
         self.setName("Timer-%s" % (self._timer_count(), ))
 
+    def apply_entry(self, entry):
+        try:
+            entry()
+        except Exception, exc:
+            typ, val, tb = einfo = sys.exc_info()
+            if not self.schedule.handle_error(einfo):
+                warnings.warn(TimedFunctionFailed(repr(exc))),
+                traceback.print_exception(typ, val, tb)
+
     def next(self):
         self.not_empty.acquire()
         try:
-            delay = self.scheduler.next()
-            if delay is None:
-                self.not_empty.wait(1.0)
-            return delay
+            delay, entry = self.scheduler.next()
+            if entry is None:
+                if delay is None:
+                    self.not_empty.wait(1.0)
+                return delay
         finally:
             self.not_empty.release()
+        return self.apply_entry(entry)
 
     def run(self):
         self.running = True
@@ -220,7 +224,8 @@ class Timer(Thread):
             try:
                 return fun(*args, **kwargs)
             finally:
-                self.enter_after(msecs, tref, priority)
+                if not tref.cancelled:
+                    self.enter_after(msecs, tref, priority)
 
         tref.fun = _reschedules
         return self.enter_after(msecs, tref, priority)