浏览代码

timer2: apply_interval no longer executes twice in the same time window

Ask Solem 13 年之前
父节点
当前提交
edf7c19e12
共有 1 个文件被更改,包括 12 次插入2 次删除
  1. 12 2
      celery/utils/timer2.py

+ 12 - 2
celery/utils/timer2.py

@@ -17,6 +17,7 @@ import heapq
 import os
 import os
 import sys
 import sys
 
 
+from functools import wraps
 from itertools import count
 from itertools import count
 from threading import Condition, Event, Lock, Thread
 from threading import Condition, Event, Lock, Thread
 from time import time, sleep, mktime
 from time import time, sleep, mktime
@@ -137,15 +138,24 @@ class Schedule(object):
 
 
     def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
     def apply_interval(self, msecs, fun, args=(), kwargs={}, priority=0):
         tref = self.Entry(fun, args, kwargs)
         tref = self.Entry(fun, args, kwargs)
+        secs = msecs * 1000.0
 
 
+        @wraps(fun)
         def _reschedules(*args, **kwargs):
         def _reschedules(*args, **kwargs):
+            last, now = tref._last_run, time()
+            lsince = (now - tref._last_run) * 1000.0 if last else msecs
             try:
             try:
-                return fun(*args, **kwargs)
+                if lsince and lsince >= msecs:
+                    tref._last_run = now
+                    return fun(*args, **kwargs)
             finally:
             finally:
                 if not tref.cancelled:
                 if not tref.cancelled:
-                    self.enter_after(msecs, tref, priority)
+                    last = tref._last_run
+                    next = secs - (now - last) if last else secs
+                    self.enter_after(next / 1000.0, tref, priority)
 
 
         tref.fun = _reschedules
         tref.fun = _reschedules
+        tref._last_run = None
         return self.enter_after(msecs, tref, priority)
         return self.enter_after(msecs, tref, priority)
 
 
     def __iter__(self):
     def __iter__(self):