Przeglądaj źródła

Make TaskBucket.immediate a Queue instead of a deque

Ask Solem 15 lat temu
rodzic
commit
e8180cadfe
1 zmienionych plików z 16 dodań i 12 usunięć
  1. 16 12
      celery/buckets.py

+ 16 - 12
celery/buckets.py

@@ -1,6 +1,6 @@
 import time
 from Queue import Queue, Empty as QueueEmpty
-from collections import deque
+
 
 RATE_MODIFIER_MAP = {"s": lambda n: n,
                      "m": lambda n: n / 60.0,
@@ -65,7 +65,7 @@ class TaskBucket(object):
         self.task_registry = task_registry
         self.buckets = {}
         self.init_with_registry()
-        self.immediate = deque()
+        self.immediate = Queue()
 
     def put(self, job):
         """Put a task into the appropiate bucket."""
@@ -75,28 +75,32 @@ class TaskBucket(object):
     def _get(self):
         # If the first queue is always returning
         # results it would never come to pick up items from the other
-        # queues. So we use a deque to include contents of other queues.
+        # queues. So we always iterate over all the queus and put any ready
+        # items on a deque called "immediate".
         if self.immediate:
-            return 0, self.immediate.popleft()
+            try:
+                return 0, self.immediate.get_nowait()
+            except QueueEmpty:
+                pass
 
-        has_item = False
         remainding_times = []
 
         for bucket in self.buckets.values():
             remainding = bucket.expected_time()
             if not remainding:
                 try:
-                    self.immediate.append(bucket.get_nowait())
-                    has_item = True
+                    self.immediate.put_nowait(bucket.get_nowait())
                 except QueueEmpty:
                     pass
             else:
                 remainding_times.append(remainding)
-        if has_item:
-            return 0, self.immediate.popleft()
-        if not remainding_times:
-            raise QueueEmpty
-        return min(remainding_times), None
+
+        try:
+            return 0, self.immediate.get_nowait()
+        except QueueEmpty:
+            if not remainding_times:
+                raise
+            return min(remainding_times), None
 
     def get(self, timeout=None):
         """Retrive the task from the first available bucket.