Browse Source

Fixes flakes for Issue #2032

Ask Solem 10 years ago
parent
commit
8187301dfd
1 changed files with 13 additions and 6 deletions
  1. 13 6
      celery/concurrency/asynpool.py

+ 13 - 6
celery/concurrency/asynpool.py

@@ -477,7 +477,9 @@ class AsynPool(_pool.Pool):
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
         """For async pool this will create the handlers called
         when a process is up/down and etc."""
-        add_reader, remove_reader, remove_writer = hub.add_reader, hub.remove_reader, hub.remove_writer
+        add_reader, remove_reader, remove_writer = (
+            hub.add_reader, hub.remove_reader, hub.remove_writer,
+        )
         cache = self._cache
         all_inqueues = self._all_inqueues
         fileno_to_inq = self._fileno_to_inq
@@ -554,12 +556,17 @@ class AsynPool(_pool.Pool):
             if proc.dead:
                 return
             process_flush_queues(proc)
-            _remove_from_index(proc.outq._reader, proc, fileno_to_outq, remove_fun=remove_reader)
+            _remove_from_index(
+                proc.outq._reader, proc, fileno_to_outq, remove_reader,
+            )
             if proc.synq:
-                _remove_from_index(proc.synq._writer, proc, fileno_to_synq, remove_fun=remove_writer)
-            inq = _remove_from_index(proc.inq._writer, proc, fileno_to_inq,
-                                     remove_fun=remove_writer,
-                                     callback=all_inqueues.discard)
+                _remove_from_index(
+                    proc.synq._writer, proc, fileno_to_synq, remove_writer,
+                )
+            inq = _remove_from_index(
+                proc.inq._writer, proc, fileno_to_inq, remove_writer,
+                callback=all_inqueues.discard,
+            )
             if inq:
                 busy_workers.discard(inq)
             remove_reader(proc.sentinel)