Browse Source

Use TokenBucket from kombu.utils.limits instead.

Ask Solem 13 years ago
parent
commit
b01b81c60a
3 changed files with 4 additions and 60 deletions
  1. 0 58
      celery/datastructures.py
  2. 2 1
      celery/events/snapshot.py
  3. 2 1
      celery/worker/buckets.py

+ 0 - 58
celery/datastructures.py

@@ -381,61 +381,3 @@ class LRUCache(UserDict):
             newval = int(self.data.pop(key)) + delta
             self[key] = str(newval)
             return newval
-
-
-class TokenBucket(object):
-    """Token Bucket Algorithm.
-
-    See http://en.wikipedia.org/wiki/Token_Bucket
-    Most of this code was stolen from an entry in the ASPN Python Cookbook:
-    http://code.activestate.com/recipes/511490/
-
-    .. admonition:: Thread safety
-
-        This implementation may not be thread safe.
-
-    """
-
-    #: The rate in tokens/second that the bucket will be refilled
-    fill_rate = None
-
-    #: Maximum number of tokensin the bucket.
-    capacity = 1
-
-    #: Timestamp of the last time a token was taken out of the bucket.
-    timestamp = None
-
-    def __init__(self, fill_rate, capacity=1):
-        self.capacity = float(capacity)
-        self._tokens = capacity
-        self.fill_rate = float(fill_rate)
-        self.timestamp = time.time()
-
-    def can_consume(self, tokens=1):
-        """Returns :const:`True` if `tokens` number of tokens can be consumed
-        from the bucket."""
-        if tokens <= self._get_tokens():
-            self._tokens -= tokens
-            return True
-        return False
-
-    def expected_time(self, tokens=1):
-        """Returns the expected time in seconds when a new token should be
-        available.
-
-        .. admonition:: Warning
-
-            This consumes a token from the bucket.
-
-        """
-        _tokens = self._get_tokens()
-        tokens = max(tokens, _tokens)
-        return (tokens - _tokens) / self.fill_rate
-
-    def _get_tokens(self):
-        if self._tokens < self.capacity:
-            now = time.time()
-            delta = self.fill_rate * (now - self.timestamp)
-            self._tokens = min(self.capacity, self._tokens + delta)
-            self.timestamp = now
-        return self._tokens

+ 2 - 1
celery/events/snapshot.py

@@ -17,9 +17,10 @@ from __future__ import absolute_import
 
 import atexit
 
+from kombu.utils.limits import TokenBucket
+
 from .. import platforms
 from ..app import app_or_default
-from ..datastructures import TokenBucket
 from ..utils import timer2, instantiate, LOG_LEVELS
 from ..utils.dispatch import Signal
 from ..utils.timeutils import rate

+ 2 - 1
celery/worker/buckets.py

@@ -24,7 +24,8 @@ from collections import deque
 from time import time, sleep
 from Queue import Queue, Empty
 
-from ..datastructures import TokenBucket
+from kombu.utils.limits import TokenBucket
+
 from ..utils import timeutils
 from ..utils.compat import zip_longest, chain_from_iterable