Browse Source

Be *way* smarter on the way we sleep between token buckets get attempts.

Ask Solem 15 years ago
parent
commit
c94ff33669
1 changed files with 29 additions and 12 deletions
  1. 29 12
      celery/buckets.py

+ 29 - 12
celery/buckets.py

@@ -50,6 +50,7 @@ class TaskBucket(object):
 
 
 
 
     """
     """
+    min_wait = 0.05
 
 
     def __init__(self, task_registry):
     def __init__(self, task_registry):
         self.task_registry = task_registry
         self.task_registry = task_registry
@@ -60,19 +61,29 @@ class TaskBucket(object):
         """Put a task into the appropiate bucket."""
         """Put a task into the appropiate bucket."""
         self.buckets[task_name].put_nowait(task)
         self.buckets[task_name].put_nowait(task)
 
 
-    def get(self):
+    def get(self, timeout=None):
         """Retrive the task from the first available bucket.
         """Retrive the task from the first available bucket.
 
 
         Available as in, there is an item in the queue and you can
         Available as in, there is an item in the queue and you can
         consume tokens from it.
         consume tokens from it.
 
 
         """
         """
+        time_spent = 0
         for bucket in self.buckets.values():
         for bucket in self.buckets.values():
+            remaining_times = []
             try:
             try:
                 return bucket.get_nowait()
                 return bucket.get_nowait()
-            except (BucketRateExceeded, QueueEmpty):
+            except BucketRateExceeded:
+                remaining_times.append(bucket.expected_time())
+            except QueueEmpty:
                 pass
                 pass
-            time.sleep(0.01)
+
+            if timeout and time_spent >= timeout:
+                raise QueueEmpty()
+            else:
+                shortest_wait = min(remaining_times or [self.min_wait])
+                time_spent += shortest_wait
+                time.sleep(shortest_wait)
 
 
     def init_with_registry(self):
     def init_with_registry(self):
         """Initialize with buckets for all the task types in the registry."""
         """Initialize with buckets for all the task types in the registry."""
@@ -94,12 +105,13 @@ class TaskBucket(object):
         """
         """
         task_type = self.task_registry[task_name]
         task_type = self.task_registry[task_name]
         task_queue = Queue()
         task_queue = Queue()
-        rate_limit = parse_ratelimit_string(task_type.rate_limit)
+        rate_limit = getattr(task_type, "rate_limit", None)
+        rate_limit = parse_ratelimit_string(rate_limit)
         if rate_limit:
         if rate_limit:
             task_queue = TokenBucketQueue(rate_limit, queue=task_queue)
             task_queue = TokenBucketQueue(rate_limit, queue=task_queue)
 
 
         self.buckets[task_name] = task_queue
         self.buckets[task_name] = task_queue
-        return bucket
+        return task_queue
 
 
 
 
 class TokenBucketQueue(object):
 class TokenBucketQueue(object):
@@ -134,11 +146,11 @@ class TokenBucketQueue(object):
         self.fill_rate = float(fill_rate)
         self.fill_rate = float(fill_rate)
         self.timestamp = time.time()
         self.timestamp = time.time()
 
 
-    def put(self, item, nb=True):
+    def put(self, item, nb=False):
         put = self.queue.put_nowait if nb else self.queue.put
         put = self.queue.put_nowait if nb else self.queue.put
         put(item)
         put(item)
 
 
-    def get(self, nb=True):
+    def get(self, nb=False):
         get = self.queue.get_nowait if nb else self.queue.get
         get = self.queue.get_nowait if nb else self.queue.get
 
 
         if not self.can_consume(1):
         if not self.can_consume(1):
@@ -149,8 +161,8 @@ class TokenBucketQueue(object):
     def get_nowait(self):
     def get_nowait(self):
         return self.get(nb=True)
         return self.get(nb=True)
 
 
-    def put_nowait(self):
-        return self.put(nb=True)
+    def put_nowait(self, item):
+        return self.put(item, nb=True)
 
 
     def qsize(self):
     def qsize(self):
         return self.queue.qsize()
         return self.queue.qsize()
@@ -160,9 +172,14 @@ class TokenBucketQueue(object):
         sufficient tokens otherwise False."""
         sufficient tokens otherwise False."""
         if tokens <= self._get_tokens():
         if tokens <= self._get_tokens():
             self._tokens -= tokens
             self._tokens -= tokens
-        else:
-            return False
-        return True
+            return True
+        return False
+
+    def expected_time(self, tokens=1):
+        """Returns the expected time in seconds when a new token should be
+        available."""
+        tokens = max(tokens, self._get_tokens())
+        return (tokens - self._get_tokens()) / self.fill_rate
 
 
     def _get_tokens(self):
     def _get_tokens(self):
         if self._tokens < self.capacity:
         if self._tokens < self.capacity: