Explorar o código

celery.buckets: Use collections.deque instead of Queue for buffer, as it doesn't have to be thread safe.

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
d8f853189b
Modificáronse 1 ficheiros con 27 adicións e 19 borrados
  1. 27 19
      celery/worker/buckets.py

+ 27 - 19
celery/worker/buckets.py

@@ -1,11 +1,14 @@
 import time
-from Queue import Queue, Empty as QueueEmpty
+
+from collections import deque
 from itertools import chain
+from Queue import Queue, Empty as QueueEmpty
 
 from celery.utils import all
 from celery.utils import timeutils
 from celery.utils.compat import izip_longest
 
+
 class RateLimitExceeded(Exception):
     """The token buckets rate limit has been exceeded."""
 
@@ -13,7 +16,8 @@ class RateLimitExceeded(Exception):
 class TaskBucket(object):
     """This is a collection of token buckets, each task type having
     its own token bucket. If the task type doesn't have a rate limit,
-    it will have a plain Queue object instead of a token bucket queue.
+    it will have a plain :class:`Queue` object instead of a
+    :class:`TokenBucketQueue`.
 
     The :meth:`put` operation forwards the task to its appropriate bucket,
     while the :meth:`get` operation iterates over the buckets and retrieves
@@ -36,30 +40,35 @@ class TaskBucket(object):
 
 
     """
-    min_wait = 0.0
 
     def __init__(self, task_registry):
         self.task_registry = task_registry
         self.buckets = {}
         self.init_with_registry()
-        self.immediate = Queue()
-
-    def put(self, job):
-        """Put a task into the appropiate bucket."""
-        if job.task_name not in self.buckets:
-            self.add_bucket_for_type(job.task_name)
-        self.buckets[job.task_name].put_nowait(job)
+        self.immediate = deque()
+
+    def put(self, request):
+        """Put a :class:`~celery.worker.job.TaskRequest` into
+        the appropiate bucket."""
+        if request.task_name not in self.buckets:
+            self.add_bucket_for_type(request.task_name)
+        self.buckets[request.task_name].put_nowait(request)
     put_nowait = put
 
+    def _get_immediate(self):
+        try:
+            return self.immediate.popleft()
+        except IndexError: # Empty
+            raise QueueEmpty()
+
     def _get(self):
         # If the first bucket is always returning items, we would never
         # get to fetch items from the other buckets. So we always iterate over
         # all the buckets and put any ready items into a queue called
         # "immediate". This queue is always checked for cached items first.
-        if self.immediate:
-            try:
-                return 0, self.immediate.get_nowait()
-            except QueueEmpty:
+        try:
+            return 0, self._get_immediate()
+        except QueueEmpty:
                 pass
 
         remaining_times = []
@@ -68,7 +77,7 @@ class TaskBucket(object):
             if not remaining:
                 try:
                     # Just put any ready items into the immediate queue.
-                    self.immediate.put_nowait(bucket.get_nowait())
+                    self.immediate.append(bucket.get_nowait())
                 except QueueEmpty:
                     pass
                 except RateLimitExceeded:
@@ -78,7 +87,7 @@ class TaskBucket(object):
 
         # Try the immediate queue again.
         try:
-            return 0, self.immediate.get_nowait()
+            return 0, self._get_immediate()
         except QueueEmpty:
             if not remaining_times:
                 # No items in any of the buckets.
@@ -238,8 +247,7 @@ class TokenBucketQueue(object):
         Also see :meth:`Queue.Queue.put`.
 
         """
-        put = block and self.queue.put or self.queue.put_nowait
-        put(item)
+        self.queue.put(item, block=block)
 
     def put_nowait(self, item):
         """Put an item into the queue without blocking.
@@ -264,7 +272,7 @@ class TokenBucketQueue(object):
         get = block and self.queue.get or self.queue.get_nowait
 
         if not self.can_consume(1):
-            raise RateLimitExceeded
+            raise RateLimitExceeded()
 
         return get()