Pārlūkot izejas kodu

Small asynpool fixes

Ask Solem 12 gadi atpakaļ
vecāks
revīzija
b79419c523
1 mainītis faili ar 16 papildinājumiem un 5 dzēšanām
  1. 16 5
      celery/concurrency/asynpool.py

+ 16 - 5
celery/concurrency/asynpool.py

@@ -1020,12 +1020,16 @@ class AsynPool(_pool.Pool):
     def _stop_task_handler(task_handler):
         """Called at shutdown to tell processes that we are shutting down."""
         for proc in task_handler.pool:
-            setblocking(proc.inq._writer, 1)
             try:
-                proc.inq.put(None)
-            except OSError as exc:
-                if get_errno(exc) != errno.EBADF:
-                    raise
+                setblocking(proc.inq._writer, 1)
+            except (OSError, IOError):
+                pass
+            else:
+                try:
+                    proc.inq.put(None)
+                except OSError as exc:
+                    if get_errno(exc) != errno.EBADF:
+                        raise
 
     def create_result_handler(self):
         return super(AsynPool, self).create_result_handler(
@@ -1075,6 +1079,13 @@ class AsynPool(_pool.Pool):
                 try:
                     task = resq.recv()
                 except (OSError, IOError, EOFError) as exc:
+                    if get_errno(exc) == errno.EINTR:
+                        continue
+                    elif get_errno(exc) == errno.EAGAIN:
+                        break
+                    else:
+                        debug('got %r while flushing process %r',
+                                exc, proc, exc_info=1)
                     if get_errno(exc) not in UNAVAIL:
                         debug('got %r while flushing process %r',
                               exc, proc, exc_info=1)