|
@@ -73,35 +73,39 @@ class TaskBucket(object):
|
|
|
put_nowait = put
|
|
|
|
|
|
def _get(self):
|
|
|
- # If the first queue is always returning items, we would never
|
|
|
- # get to fetching items from the other queues.
|
|
|
- # So we always iterate over all the queues and put any ready
|
|
|
- # items on a queue called "immediate". This queue is always checked
|
|
|
- # for cached items first.
|
|
|
+ # If the first bucket is always returning items, we would never
|
|
|
+ # get to fetch items from the other buckets. So we always iterate over
|
|
|
+ # all the buckets and put any ready items into a queue called
|
|
|
+ # "immediate". This queue is always checked for cached items first.
|
|
|
if self.immediate:
|
|
|
try:
|
|
|
return 0, self.immediate.get_nowait()
|
|
|
except QueueEmpty:
|
|
|
pass
|
|
|
|
|
|
- remainding_times = []
|
|
|
-
|
|
|
+ remaining_times = []
|
|
|
for bucket in self.buckets.values():
|
|
|
- remainding = bucket.expected_time()
|
|
|
- if not remainding:
|
|
|
+ remaining = bucket.expected_time()
|
|
|
+ if not remaining:
|
|
|
try:
|
|
|
+ # Just put any ready items into the immediate queue.
|
|
|
self.immediate.put_nowait(bucket.get_nowait())
|
|
|
except QueueEmpty:
|
|
|
pass
|
|
|
else:
|
|
|
- remainding_times.append(remainding)
|
|
|
+ remaining_times.append(remaining)
|
|
|
|
|
|
+ # Try the immediate queue again.
|
|
|
try:
|
|
|
return 0, self.immediate.get_nowait()
|
|
|
except QueueEmpty:
|
|
|
- if not remainding_times:
|
|
|
+ if not remaining_times:
|
|
|
+ # No items in any of the buckets.
|
|
|
raise
|
|
|
- return min(remainding_times), None
|
|
|
+
|
|
|
+ # There's items, but have to wait before we can retrieve them,
|
|
|
+ # return the shortest remaining time.
|
|
|
+ return min(remaining_times), None
|
|
|
|
|
|
def get(self, block=True, timeout=None):
|
|
|
"""Retrive the task from the first available bucket.
|
|
@@ -114,11 +118,11 @@ class TaskBucket(object):
|
|
|
did_timeout = lambda: timeout and time.time() - time_start > timeout
|
|
|
|
|
|
while True:
|
|
|
- remainding_time, item = self._get()
|
|
|
- if remainding_time:
|
|
|
+ remaining_time, item = self._get()
|
|
|
+ if remaining_time:
|
|
|
if not block or did_timeout():
|
|
|
raise QueueEmpty
|
|
|
- time.sleep(remainding_time)
|
|
|
+ time.sleep(remaining_time)
|
|
|
else:
|
|
|
return item
|
|
|
|