Prechádzať zdrojové kódy

Fixes infinite loop in process_flush_queues

Ask Solem 12 rokov pred
rodič
commit
0c85fefa67
1 zmenil súbory, kde vykonal 13 pridanie a 3 odobranie
  1. 13 3
      celery/concurrency/processes.py

+ 13 - 3
celery/concurrency/processes.py

@@ -346,9 +346,19 @@ class AsynPool(_pool.Pool):
 
     def process_flush_queues(self, proc):
         resq = proc.outq._reader
-        if not resq.closed:
-            while resq.poll(0):
-                self.handle_result_event(resq.fileno())
+        on_state_change = self._result_handler.on_state_change
+        while not resq.closed and resq.poll(0) and self._state != TERMINATE:
+            try:
+                task = resq.recv()
+            except (IOError, EOFError) as exc:
+                debug('got %r while flushing process %r',
+                        exc, proc, exc_info=1)
+                break
+            else:
+                if task is not None:
+                    on_state_change(task)
+                else:
+                    debug('got sentinel while flushing process %r', proc)
 
     def on_partial_read(self, job, proc):
         # worker terminated by signal: