|
@@ -2,7 +2,6 @@ import time
|
|
|
from Queue import Queue
|
|
|
from Queue import Empty as QueueEmpty
|
|
|
|
|
|
-
|
|
|
RATE_MODIFIER_MAP = {"s": lambda n: n,
|
|
|
"m": lambda n: n / 60.0,
|
|
|
"h": lambda n: n / 60.0 / 60.0}
|
|
@@ -134,32 +133,13 @@ class TaskBucket(object):
|
|
|
def get_nowait(self):
|
|
|
return self.get(block=False)
|
|
|
|
|
|
- def __old_get(self, block=True, timeout=None):
|
|
|
- time_spent = 0
|
|
|
- for bucket in self.buckets.values():
|
|
|
- remaining_times = []
|
|
|
- try:
|
|
|
- return bucket.get_nowait()
|
|
|
- except RateLimitExceeded:
|
|
|
- remaining_times.append(bucket.expected_time())
|
|
|
- except QueueEmpty:
|
|
|
- pass
|
|
|
-
|
|
|
- if not remaining_times:
|
|
|
- if not block or (timeout and time_spent >= timeout):
|
|
|
- raise QueueEmpty
|
|
|
- else:
|
|
|
- shortest_wait = min(remaining_times or [self.min_wait])
|
|
|
- time_spent += shortest_wait
|
|
|
- time.sleep(shortest_wait)
|
|
|
-
|
|
|
def init_with_registry(self):
|
|
|
"""Initialize with buckets for all the task types in the registry."""
|
|
|
map(self.add_bucket_for_type, self.task_registry.keys())
|
|
|
|
|
|
def get_bucket_for_type(self, task_name):
|
|
|
"""Get the bucket for a particular task type."""
|
|
|
- if not task_name in self.buckets:
|
|
|
+ if task_name not in self.buckets:
|
|
|
return self.add_bucket_for_type(task_name)
|
|
|
return self.buckets[task_name]
|
|
|
|
|
@@ -171,6 +151,7 @@ class TaskBucket(object):
|
|
|
will be used.
|
|
|
|
|
|
"""
|
|
|
+ assert task_name not in self.buckets
|
|
|
task_type = self.task_registry[task_name]
|
|
|
task_queue = Queue()
|
|
|
rate_limit = getattr(task_type, "rate_limit", None)
|