Browse Source

[py3] AsynPool: do not remove while iterating over set

Ask Solem 11 years ago
parent
commit
6b57d62eaa
1 changed files with 37 additions and 35 deletions
  1. 37 35
      celery/concurrency/asynpool.py

+ 37 - 35
celery/concurrency/asynpool.py

@@ -243,45 +243,47 @@ class ResultHandler(_pool.ResultHandler):
             if check_timeouts is not None:
                 # make sure tasks with a time limit will time out.
                 check_timeouts()
+            # cannot iterate and remove at the same time
+            pending_remove_fd = set()
             for fd in outqueues:
-                try:
-                    proc = fileno_to_outq[fd]
-                except KeyError:
-                    # process already found terminated
-                    # which means its outqueue has already been processed
-                    # by the worker lost handler.
-                    outqueues.discard(fd)
-                    continue
-
-                reader = proc.outq._reader
-                try:
-                    setblocking(reader, 1)
-                except (OSError, IOError):
-                    outqueues.discard(fd)
-                    continue
-                try:
-                    if reader.poll(0):
-                        task = reader.recv()
-                    else:
-                        task = None
-                        sleep(0.5)
-                except (IOError, EOFError):
-                    outqueues.discard(fd)
-                    continue
-                else:
-                    if task:
-                        on_state_change(task)
-                finally:
-                    try:
-                        setblocking(reader, 0)
-                    except (OSError, IOError):
-                        outqueues.discard(fd)
-
+                self._flush_outqueue(
+                    fd, pending_remove_fd.discard, fileno_to_outq,
                 try:
                     join_exited_workers(shutdown=True)
                 except WorkersJoined:
-                    debug('result handler: all workers terminated')
-                    return
+                    return debug('result handler: all workers terminated')
+            outqueues.difference_update(pending_remove_fd)
+
+    def _flush_outqueue(self, fd, remove, process_index, on_state_change):
+        try:
+            proc = process_index[fd]
+        except KeyError:
+            # process already found terminated
+            # which means its outqueue has already been processed
+            # by the worker lost handler.
+            return remove(fd)
+
+        reader = proc.outq._reader
+        try:
+            setblocking(reader, 1)
+        except (OSError, IOError):
+            return remove(fd)
+        try:
+            if reader.poll(0):
+                task = reader.recv()
+            else:
+                task = None
+                sleep(0.5)
+        except (IOError, EOFError):
+            return remove(fd)
+        else:
+            if task:
+                on_state_change(task)
+        finally:
+            try:
+                setblocking(reader, 0)
+            except (OSError, IOError):
+                return remove(fd)
 
 
 class AsynPool(_pool.Pool):