Ver Fonte

rate limits: Fixes performance regression. Thanks to 23doors. Closes #598.

Ask Solem há 13 anos atrás
pai
commit
94296014dd
1 ficheiros alterados com 17 adições e 12 exclusões
  1. 17 12
      celery/worker/buckets.py

+ 17 - 12
celery/worker/buckets.py

@@ -21,7 +21,7 @@ from __future__ import with_statement
 import threading
 
 from collections import deque
-from time import time, sleep
+from time import time, sleep as _sleep
 from Queue import Queue, Empty
 
 from kombu.utils.limits import TokenBucket
@@ -29,6 +29,10 @@ from kombu.utils.limits import TokenBucket
 from ..utils import timeutils
 from ..utils.compat import zip_longest, chain_from_iterable
 
+def sleep(n):
+    print("bucketsSLEEP: %r" % (n, ))
+    _sleep(n)
+
 
 class RateLimitExceeded(Exception):
     """The token buckets rate limit has been exceeded."""
@@ -72,10 +76,10 @@ class TaskBucket(object):
     def put(self, request):
         """Put a :class:`~celery.worker.job.Request` into
         the appropiate bucket."""
+        if request.task_name not in self.buckets:
+            self.add_bucket_for_type(request.task_name)
+        self.buckets[request.task_name].put_nowait(request)
         with self.mutex:
-            if request.task_name not in self.buckets:
-                self.add_bucket_for_type(request.task_name)
-            self.buckets[request.task_name].put_nowait(request)
             self.not_empty.notify()
     put_nowait = put
 
@@ -128,20 +132,21 @@ class TaskBucket(object):
         consume tokens from it.
 
         """
-        time_start = time()
-        did_timeout = lambda: timeout and time() - time_start > timeout
+        tstart = time()
+        get = self._get
+        not_empty = self.not_empty
 
-        with self.not_empty:
-            while True:
+        with not_empty:
+            while 1:
                 try:
-                    remaining_time, item = self._get()
+                    remaining_time, item = get()
                 except Empty:
-                    if not block or did_timeout():
+                    if not block or (timeout and time() - tstart > timeout):
                         raise
-                    self.not_empty.wait(timeout)
+                    not_empty.wait(timeout)
                     continue
                 if remaining_time:
-                    if not block or did_timeout():
+                    if not block or (timeout and time() - tstart > timeout):
                         raise Empty()
                     sleep(min(remaining_time, timeout or 1))
                 else: