Browse Source

This suddenly got much more complicated than I first thought: If the first queue is always returning
items, the _get method would never get to fetch items from the other queues,
so instead, for every get(), we now always iterate through all the queues once,
and put all ready items on a deque called self.immediate. This queue is
checked for already chached items at the start of get().

queues. So we use a deque to include contents of other queues.

Ask Solem 15 years ago
parent
commit
cf53b9fff9
1 changed files with 14 additions and 1 deletions
  1. 14 1
      celery/buckets.py

+ 14 - 1
celery/buckets.py

@@ -1,5 +1,6 @@
 import time
 import time
 from Queue import Queue, Empty as QueueEmpty
 from Queue import Queue, Empty as QueueEmpty
+from collections import deque
 
 
 RATE_MODIFIER_MAP = {"s": lambda n: n,
 RATE_MODIFIER_MAP = {"s": lambda n: n,
                      "m": lambda n: n / 60.0,
                      "m": lambda n: n / 60.0,
@@ -64,6 +65,7 @@ class TaskBucket(object):
         self.task_registry = task_registry
         self.task_registry = task_registry
         self.buckets = {}
         self.buckets = {}
         self.init_with_registry()
         self.init_with_registry()
+        self.immediate = deque()
 
 
     def put(self, job):
     def put(self, job):
         """Put a task into the appropiate bucket."""
         """Put a task into the appropiate bucket."""
@@ -71,16 +73,27 @@ class TaskBucket(object):
     put_nowait = put
     put_nowait = put
 
 
     def _get(self):
     def _get(self):
+        # If the first queue is always returning
+        # results it would never come to pick up items from the other
+        # queues. So we use a deque to include contents of other queues.
+        if self.immediate:
+            return 0, self.immediate.popleft()
+
+        has_item = False
         remainding_times = []
         remainding_times = []
+
         for bucket in self.buckets.values():
         for bucket in self.buckets.values():
             remainding = bucket.expected_time()
             remainding = bucket.expected_time()
             if not remainding:
             if not remainding:
                 try:
                 try:
-                    return 0, bucket.get_nowait()
+                    self.immediate.append(bucket.get_nowait())
+                    has_item = True
                 except QueueEmpty:
                 except QueueEmpty:
                     pass
                     pass
             else:
             else:
                 remainding_times.append(remainding)
                 remainding_times.append(remainding)
+        if has_item:
+            return 0, self.immediate.popleft()
         if not remainding_times:
         if not remainding_times:
             raise QueueEmpty
             raise QueueEmpty
         return min(remainding_times), None
         return min(remainding_times), None