Kaynağa Gözat

Fixed weird bug with new processes created by the supervisor being stuck while reading from the task Queue. See http://bugs.python.org/issue10037

Ask Solem 14 yıl önce
ebeveyn
işleme
29abede74a
1 değiştirilmiş dosya ile 16 ekleme ve 1 silme
  1. 16 1
      celery/concurrency/processes/pool.py

+ 16 - 1
celery/concurrency/processes/pool.py

@@ -85,6 +85,19 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
     pid = os.getpid()
     put = outqueue.put
     get = inqueue.get
+
+    if hasattr(inqueue, '_reader'):
+        def poll(timeout):
+            if inqueue._reader.poll(timeout):
+                return True, get()
+            return False, None
+    else:
+        def poll(timeout):
+            try:
+                return True, get(timeout=timeout)
+            except Queue.Empty:
+                return False, None
+
     if hasattr(inqueue, '_writer'):
         inqueue._writer.close()
         outqueue._reader.close()
@@ -98,7 +111,9 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
     completed = 0
     while maxtasks is None or (maxtasks and completed < maxtasks):
         try:
-            task = get()
+            ready, task = poll(1.0)
+            if not ready:
+                continue
         except (EOFError, IOError):
             debug('worker got EOFError or IOError -- exiting')
             break