Browse Source

AsynPool: -Ofair should not always schedule writes

Ask Solem 11 years ago
parent
commit
1029d24e4f
1 changed files with 48 additions and 36 deletions
  1. 48 36
      celery/concurrency/asynpool.py

+ 48 - 36
celery/concurrency/asynpool.py

@@ -253,7 +253,7 @@ class ResultHandler(_pool.ResultHandler):
                     break
 
                 reader = proc.outq._reader
-                reader.setblocking(1)
+                setblocking(reader, 1)
                 try:
                     if reader.poll(0):
                         task = reader.recv()
@@ -267,7 +267,7 @@ class ResultHandler(_pool.ResultHandler):
                     if task:
                         on_state_change(task)
                 finally:
-                    reader.setblocking(0)
+                    setblocking(reader, 0)
 
                 try:
                     join_exited_workers(shutdown=True)
@@ -554,25 +554,34 @@ class AsynPool(_pool.Pool):
                     outbound.appendleft(job)
         self._put_back = _put_back
 
-        def on_poll_start():
-            # called for every event loop iteration, and if there
-            # are messages pending this will schedule writing one message
-            # by registering the 'schedule_writes' function for all currently
-            # inactive inqueues (not already being written to)
-
-            # consolidate means the event loop will merge them
-            # and call the callback once with the list writable fds as
-            # argument.  Using this means we minimize the risk of having
-            # the same fd receive every task if the pipe read buffer is not
-            # full.
-            if outbound:
-                #print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
-                #                              len(active_writes)))
-                inactive = diff(active_writes)
-                [hub_add(fd, None, WRITE | ERR, consolidate=True)
-                 for fd in inactive]
-            else:
-                [hub_remove(fd) for fd in diff(active_writes)]
+        # called for every event loop iteration, and if there
+        # are messages pending this will schedule writing one message
+        # by registering the 'schedule_writes' function for all currently
+        # inactive inqueues (not already being written to)
+
+        # consolidate means the event loop will merge them
+        # and call the callback once with the list writable fds as
+        # argument.  Using this means we minimize the risk of having
+        # the same fd receive every task if the pipe read buffer is not
+        # full.
+        if is_fair_strategy:
+
+            def on_poll_start():
+                if outbound and len(busy_workers) < len(all_inqueues):
+                    #print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
+                    #                              len(active_writes)))
+                    inactive = diff(active_writes)
+                    [hub_add(fd, None, WRITE | ERR, consolidate=True)
+                     for fd in inactive]
+                else:
+                    [hub_remove(fd) for fd in diff(active_writes)]
+        else:
+            def on_poll_start():  # noqa
+                if outbound:
+                    [hub_add(fd, None, WRITE | ERR, consolidate=True)
+                     for fd in diff(active_writes)]
+                else:
+                    [hub_remove(fd) for fd in diff(active_writes)]
         self.on_poll_start = on_poll_start
 
         def on_inqueue_close(fd, proc):
@@ -1002,22 +1011,25 @@ class AsynPool(_pool.Pool):
         """
         resq = proc.outq._reader
         on_state_change = self._result_handler.on_state_change
-        while not resq.closed and resq.poll(0) and self._state != TERMINATE:
-            setblocking(resq, 1)
-            try:
-                task = resq.recv()
-            except (IOError, EOFError) as exc:
-                debug('got %r while flushing process %r',
-                      exc, proc, exc_info=1)
-                break
-            else:
-                if task is not None:
-                    on_state_change(task)
+        fds = set([resq])
+        while fds and not resq.closed and self._state != TERMINATE:
+            readable, _, again = _select(fds, None, fds, timeout=0.01)
+            if readable:
+                try:
+                    task = resq.recv()
+                except (OSError, IOError, EOFError) as exc:
+                    if get_errno(exc) not in UNAVAIL:
+                        debug('got %r while flushing process %r',
+                              exc, proc, exc_info=1)
+                    break
                 else:
-                    debug('got sentinel while flushing process %r', proc)
-            finally:
-                setblocking(resq, 0)
-                assert not isblocking(resq)
+                    if task is None:
+                        debug('got sentinel while flushing process %r', proc)
+                        break
+                    else:
+                        on_state_change(task)
+            else:
+                break
 
     def on_partial_read(self, job, proc):
         """Called when a job was only partially written to a child process