Procházet zdrojové kódy

Small timer optimizations

Ask Solem před 12 roky
rodič
revize
304922e535
2 změnil soubory, kde provedl 43 přidání a 11 odebrání
  1. 8 11
      celery/utils/timer2.py
  2. 35 0
      funtests/benchmarks/timer.py

+ 8 - 11
celery/utils/timer2.py

@@ -14,12 +14,12 @@ import heapq
 import os
 import os
 import sys
 import sys
 import threading
 import threading
-import weakref
 
 
 from datetime import datetime
 from datetime import datetime
 from functools import wraps
 from functools import wraps
 from itertools import count
 from itertools import count
 from time import time, sleep
 from time import time, sleep
+from weakref import proxy as weakrefproxy
 
 
 from celery.utils.compat import THREAD_TIMEOUT_MAX
 from celery.utils.compat import THREAD_TIMEOUT_MAX
 from celery.utils.timeutils import timedelta_seconds, timezone
 from celery.utils.timeutils import timedelta_seconds, timezone
@@ -46,14 +46,14 @@ class Entry(object):
             'fun', 'args', 'kwargs', 'tref', 'cancelled',
             'fun', 'args', 'kwargs', 'tref', 'cancelled',
             '_last_run', '__weakref__',
             '_last_run', '__weakref__',
         )
         )
+    _last_run = None
+    cancelled = False
 
 
     def __init__(self, fun, args=None, kwargs=None):
     def __init__(self, fun, args=None, kwargs=None):
         self.fun = fun
         self.fun = fun
         self.args = args or []
         self.args = args or []
         self.kwargs = kwargs or {}
         self.kwargs = kwargs or {}
-        self.tref = weakref.proxy(self)
-        self.cancelled = False
-        self._last_run = None
+        self.tref = weakrefproxy(self)
 
 
     def __call__(self):
     def __call__(self):
         return self.fun(*self.args, **self.kwargs)
         return self.fun(*self.args, **self.kwargs)
@@ -144,7 +144,7 @@ class Schedule(object):
     def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
     def apply_at(self, eta, fun, args=(), kwargs={}, priority=0):
         return self.enter(self.Entry(fun, args, kwargs), eta, priority)
         return self.enter(self.Entry(fun, args, kwargs), eta, priority)
 
 
-    def enter_after(self, msecs, entry, priority=0):
+    def enter_after(self, msecs, entry, priority=0, time=time):
         return self.enter(entry, time() + (msecs / 1000.0), priority)
         return self.enter(entry, time() + (msecs / 1000.0), priority)
 
 
     def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
     def apply_after(self, msecs, fun, args=(), kwargs={}, priority=0):
@@ -176,12 +176,9 @@ class Schedule(object):
     def schedule(self):
     def schedule(self):
         return self
         return self
 
 
-    def __iter__(self):
+    def __iter__(self, min=min, nowfun=time, pop=heapq.heappop,
+                 push=heapq.heappush):
         """The iterator yields the time to sleep for between runs."""
         """The iterator yields the time to sleep for between runs."""
-
-        # localize variable access
-        nowfun = time
-        pop = heapq.heappop
         max_interval = self.max_interval
         max_interval = self.max_interval
         queue = self._queue
         queue = self._queue
 
 
@@ -200,7 +197,7 @@ class Schedule(object):
                             yield None, entry
                             yield None, entry
                         continue
                         continue
                     else:
                     else:
-                        heapq.heappush(queue, event)
+                        push(queue, event)
             else:
             else:
                 yield None, None
                 yield None, None
 
 

+ 35 - 0
funtests/benchmarks/timer.py

@@ -0,0 +1,35 @@
+# -*- coding: utf-8 -*-
+from __future__ import absolute_import
+
+import sys
+
+from time import sleep
+from celery.utils import timer2 as timer
+
+def noop(*args, **kwargs):
+    return
+
+
+def insert(s, n=100000):
+    for i in xrange(n):
+        s.apply_after(1 + (i and i / 10.0), noop, (i, ))
+
+
+def slurp(s, n=100000):
+    i = 0
+    it = iter(s)
+    while i < n:
+        delay, entry = next(it)
+        if entry:
+            i += 1
+            s.apply_entry(entry)
+        #else:
+            #if delay:
+            #    sleep(delay)
+
+if __name__ == '__main__':
+    s = timer.Schedule()
+    insert(s)
+    if '--insert-only' not in sys.argv:
+        slurp(s)
+