Explorar el Código

worker.buckets.TaskBucket.get: Honor timeout argument and wait for item available in one of the queues.

Ask Solem hace 15 años
padre
commit
89d123fd1f
Se han modificado 1 ficheros con 29 adiciones y 11 borrados
  1. 29 11
      celery/worker/buckets.py

+ 29 - 11
celery/worker/buckets.py

@@ -1,3 +1,4 @@
+import threading
 import time
 
 from collections import deque
@@ -46,13 +47,20 @@ class TaskBucket(object):
         self.buckets = {}
         self.init_with_registry()
         self.immediate = deque()
+        self.mutex = threading.Lock()
+        self.not_empty = threading.Condition(self.mutex)
 
     def put(self, request):
         """Put a :class:`~celery.worker.job.TaskRequest` into
         the appropiate bucket."""
-        if request.task_name not in self.buckets:
-            self.add_bucket_for_type(request.task_name)
-        self.buckets[request.task_name].put_nowait(request)
+        self.mutex.acquire()
+        try:
+            if request.task_name not in self.buckets:
+                self.add_bucket_for_type(request.task_name)
+            self.buckets[request.task_name].put_nowait(request)
+            self.not_empty.notify()
+        finally:
+            self.mutex.release()
     put_nowait = put
 
     def _get_immediate(self):
@@ -107,14 +115,24 @@ class TaskBucket(object):
         time_start = time.time()
         did_timeout = lambda: timeout and time.time() - time_start > timeout
 
-        while True:
-            remaining_time, item = self._get()
-            if remaining_time:
-                if not block or did_timeout():
-                    raise QueueEmpty
-                time.sleep(min(remaining_time, timeout or 1))
-            else:
-                return item
+        self.not_empty.acquire()
+        try:
+            while True:
+                try:
+                    remaining_time, item = self._get()
+                except QueueEmpty:
+                    if not block or did_timeout():
+                        raise
+                    self.not_empty.wait(timeout)
+                    continue
+                if remaining_time:
+                    if not block or did_timeout():
+                        raise QueueEmpty
+                    time.sleep(min(remaining_time, timeout or 1))
+                else:
+                    return item
+        finally:
+            self.not_empty.release()
 
     def get_nowait(self):
         return self.get(block=False)