瀏覽代碼

AsynPool: Do not flush if process exited

Ask Solem 11 年之前
父節點
當前提交
4696350806
共有 1 個文件被更改,包括 9 次插入5 次删除
  1. 9 5
      celery/concurrency/asynpool.py

+ 9 - 5
celery/concurrency/asynpool.py

@@ -812,7 +812,7 @@ class AsynPool(_pool.Pool):
                             else:
                                 job_proc = job._write_to
                                 if job_proc.exitcode is None:
-                                    self._flush_writer(job_proc.inq, gen)
+                                    self._flush_writer(job_proc, gen)
                     # workers may have exited in the meantime.
                     self.maintain_pool()
                     sleep(next(intervals))  # don't busyloop
@@ -822,12 +822,16 @@ class AsynPool(_pool.Pool):
             self._active_writes.clear()
             self._busy_workers.clear()
 
-    def _flush_writer(self, inq, writer):
-        fds = set([inq._writer])
+    def _flush_writer(self, proc, writer):
+        fds = set([proc.inq._writer])
         try:
             while fds:
-                _, writable, again = _select(writers=fds, timeout=0.5)
-                if not again and writable:
+                if proc.exitcode:
+                    break  # process exited
+                readable, writable, again = _select(
+                    writers=fds, err=fds, timeout=0.5,
+                )
+                if not again and (writable or readable):
                     try:
                         next(writer)
                     except (StopIteration, OSError, IOError, EOFError):