Przeglądaj źródła

timer2.Timer now uses a not_empty condition to wait for new tasks instead of a sleep polling loop

Ask Solem 14 lat temu
rodzic
commit
d44a6f1260
1 zmienionych plików z 35 dodań i 15 usunięć
  1. 35 15
      celery/utils/timer2.py

+ 35 - 15
celery/utils/timer2.py

@@ -9,7 +9,7 @@ import traceback
 import warnings
 
 from itertools import count
-from threading import Thread, Event
+from threading import Condition, Event, Lock, Thread
 from time import time, sleep, mktime
 
 from datetime import datetime, timedelta
@@ -137,34 +137,45 @@ class Schedule(object):
 class Timer(Thread):
     Entry = Entry
 
-    precision = 0.3
     running = False
     on_tick = None
     _timer_count = count(1).next
 
-    def __init__(self, schedule=None, precision=None, on_error=None,
-            on_tick=None):
-        if precision is not None:
-            self.precision = precision
+    def __init__(self, schedule=None, on_error=None, on_tick=None, **kwargs):
         self.schedule = schedule or Schedule(on_error=on_error)
         self.on_tick = on_tick or self.on_tick
 
         Thread.__init__(self)
         self._shutdown = Event()
         self._stopped = Event()
+        self.mutex = Lock()
+        self.not_empty = Condition(self.mutex)
         self.setDaemon(True)
         self.setName("Timer-%s" % (self._timer_count(), ))
 
+    def next(self):
+        self.not_empty.acquire()
+        try:
+            delay = self.scheduler.next()
+            if delay is None:
+                print("WAITING FOR ENTRY")
+                self.not_empty.wait()
+            return delay
+        finally:
+            self.not_empty.release()
+
     def run(self):
         self.running = True
-        scheduler = iter(self.schedule)
+        self.scheduler = iter(self.schedule)
+
         while not self._shutdown.isSet():
-            delay = scheduler.next() or self.precision
-            if self.on_tick:
-                self.on_tick(delay)
-            if sleep is None:
-                break
-            sleep(delay)
+            delay = self.next()
+            if delay:
+                if self.on_tick:
+                    self.on_tick(delay)
+                if sleep is None:
+                    break
+                sleep(delay)
         try:
             self._stopped.set()
         except TypeError:           # pragma: no cover
@@ -179,10 +190,19 @@ class Timer(Thread):
             self.join(1e100)
             self.running = False
 
-    def enter(self, entry, eta, priority=None):
+    def ensure_started(self):
         if not self.running and not self.is_alive():
             self.start()
-        return self.schedule.enter(entry, eta, priority)
+
+    def enter(self, entry, eta, priority=None):
+        self.ensure_started()
+        self.mutex.acquire()
+        try:
+            entry = self.schedule.enter(entry, eta, priority)
+            self.not_empty.notify()
+            return entry
+        finally:
+            self.mutex.release()
 
     def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
         return self.enter(self.Entry(fun, args, kwargs), eta, priority)