Browse Source

Asynpool: Pipes must be set nonblocking at creation time

Ask Solem 11 years ago
parent
commit
c2c4be5560
1 changed files with 27 additions and 17 deletions
  1. 27 17
      celery/concurrency/asynpool.py

+ 27 - 17
celery/concurrency/asynpool.py

@@ -35,6 +35,7 @@ from weakref import WeakValueDictionary, ref
 from amqp.utils import promise
 from amqp.utils import promise
 from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
 from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
 from billiard import pool as _pool
 from billiard import pool as _pool
+from billiard.compat import setblocking, isblocking
 from billiard.einfo import ExceptionInfo
 from billiard.einfo import ExceptionInfo
 from billiard.queues import _SimpleQueue
 from billiard.queues import _SimpleQueue
 from kombu.async import READ, WRITE, ERR
 from kombu.async import READ, WRITE, ERR
@@ -156,6 +157,7 @@ class ResultHandler(_pool.ResultHandler):
         # header
         # header
         remaining = 4
         remaining = 4
         bsize = None
         bsize = None
+        assert not isblocking(fd)
         while remaining > 0:
         while remaining > 0:
             try:
             try:
                 bsize = read(fd, remaining)
                 bsize = read(fd, remaining)
@@ -202,16 +204,11 @@ class ResultHandler(_pool.ResultHandler):
         hub_remove = hub.remove
         hub_remove = hub.remove
         recv_message = self._recv_message
         recv_message = self._recv_message
 
 
-        def on_readable(fileno):
+        def on_result_readable(fileno):
             try:
             try:
-                proc = fileno_to_outq[fileno]
-            except KeyError:
-                hub_remove(fileno)
-                # process gone
-                return
-            reader = proc.outq._reader
-            reader.setblocking(0)
-
+                fileno_to_outq[fileno]
+            except KeyError:  # process gone
+                return hub_remove(fileno)
             it = recv_message(add_reader, fileno, on_state_change)
             it = recv_message(add_reader, fileno, on_state_change)
             try:
             try:
                 next(it)
                 next(it)
@@ -221,7 +218,7 @@ class ResultHandler(_pool.ResultHandler):
                 hub_remove(fileno)
                 hub_remove(fileno)
             else:
             else:
                 add_reader(fileno, it)
                 add_reader(fileno, it)
-        return on_readable
+        return on_result_readable
 
 
     def register_with_event_loop(self, hub):
     def register_with_event_loop(self, hub):
         self.handle_event = self._make_process_result(hub)
         self.handle_event = self._make_process_result(hub)
@@ -465,6 +462,9 @@ class AsynPool(_pool.Pool):
             add_reader(
             add_reader(
                 proc.sentinel, event_process_exit, hub, proc.sentinel,
                 proc.sentinel, event_process_exit, hub, proc.sentinel,
             )
             )
+
+            assert not isblocking(proc.outq._reader)
+
             # handle_result_event is called when the processes outqueue is
             # handle_result_event is called when the processes outqueue is
             # readable.
             # readable.
             add_reader(proc.outqR_fd, handle_result_event, proc.outqR_fd)
             add_reader(proc.outqR_fd, handle_result_event, proc.outqR_fd)
@@ -704,6 +704,7 @@ class AsynPool(_pool.Pool):
                     else:
                     else:
                         errors = 0
                         errors = 0
             finally:
             finally:
+                hub_remove(fd)
                 write_stats[proc.index] += 1
                 write_stats[proc.index] += 1
                 # message written, so this fd is now available
                 # message written, so this fd is now available
                 active_writes.discard(fd)
                 active_writes.discard(fd)
@@ -863,12 +864,20 @@ class AsynPool(_pool.Pool):
     def create_process_queues(self):
     def create_process_queues(self):
         """Creates new in, out (and optionally syn) queues,
         """Creates new in, out (and optionally syn) queues,
         returned as a tuple."""
         returned as a tuple."""
-        inq, outq, synq = _SimpleQueue(), _SimpleQueue(), None
-        inq._writer.setblocking(0)
-        outq._reader.setblocking(0)
+        # NOTE: Pipes must be set O_NONBLOCK at creation time (the original
+        # fd), otherwise it will not be possible to change the flags until
+        # there is an actual reader/writer on the other side.
+        inq = _SimpleQueue(wnonblock=True)
+        outq = _SimpleQueue(rnonblock=True)
+        synq = None
+        assert isblocking(inq._reader)
+        assert not isblocking(inq._writer)
+        assert not isblocking(outq._reader)
+        assert isblocking(outq._writer)
         if self.synack:
         if self.synack:
-            synq = _SimpleQueue()
-            synq._writer.setblocking(0)
+            synq = _SimpleQueue(wnonblock=True)
+            assert isblocking(synq._reader)
+            assert not isblocking(synq._writer)
         return inq, outq, synq
         return inq, outq, synq
 
 
     def on_process_alive(self, pid):
     def on_process_alive(self, pid):
@@ -986,7 +995,7 @@ class AsynPool(_pool.Pool):
         resq = proc.outq._reader
         resq = proc.outq._reader
         on_state_change = self._result_handler.on_state_change
         on_state_change = self._result_handler.on_state_change
         while not resq.closed and resq.poll(0) and self._state != TERMINATE:
         while not resq.closed and resq.poll(0) and self._state != TERMINATE:
-            resq.setblocking(1)
+            setblocking(resq, 1)
             try:
             try:
                 task = resq.recv()
                 task = resq.recv()
             except (IOError, EOFError) as exc:
             except (IOError, EOFError) as exc:
@@ -999,7 +1008,8 @@ class AsynPool(_pool.Pool):
                 else:
                 else:
                     debug('got sentinel while flushing process %r', proc)
                     debug('got sentinel while flushing process %r', proc)
             finally:
             finally:
-                resq.setblocking(0)
+                setblocking(resq, 0)
+                assert not isblocking(resq)
 
 
     def on_partial_read(self, job, proc):
     def on_partial_read(self, job, proc):
         """Called when a job was only partially written to a child process
         """Called when a job was only partially written to a child process