Browse Source

Merge pull request #1834 from ionelmc/fix-1785

Better cleanup of dead processes.
Ionel Cristian Mărieș 11 years ago
parent
commit
a40ea201f8
1 changed files with 11 additions and 2 deletions
  1. 11 2
      celery/concurrency/asynpool.py

+ 11 - 2
celery/concurrency/asynpool.py

@@ -501,7 +501,7 @@ class AsynPool(_pool.Pool):
             if proc._is_alive() and proc in waiting_to_start:
                 assert proc.outqR_fd in fileno_to_outq
                 assert fileno_to_outq[proc.outqR_fd] is proc
-                assert proc.outqR_fd in hub.readers
+                assert proc.outqR_fd in hub.readers, "%s.outqR_fd=%s not in hub.readers !" % (proc, proc.outqR_fd)
                 error('Timed out waiting for UP message from %r', proc)
                 os.kill(proc.pid, 9)
 
@@ -570,6 +570,15 @@ class AsynPool(_pool.Pool):
             if inq:
                 busy_workers.discard(inq)
             hub_remove(proc.sentinel)
+            waiting_to_start.discard(proc)
+            self._active_writes.discard(proc.inqW_fd)
+            hub_remove(proc.inqW_fd)
+            hub_remove(proc.outqR_fd)
+            if proc.synqR_fd:
+                hub_remove(proc.synqR_fd)
+            if proc.synqW_fd:
+                self._active_writes.discard(proc.synqW_fd)
+                hub_remove(proc.synqW_fd)
         self.on_process_down = on_process_down
 
     def _create_write_handlers(self, hub,
@@ -966,7 +975,7 @@ class AsynPool(_pool.Pool):
         try:
             proc = next(w for w in self._pool if w.pid == pid)
         except StopIteration:
-            # process already exited :(  this will be handled elsewhere.
+            logger.warning("process with pid=%s already exited :( - handling this elsewhere ...", pid)
             return
         assert proc.inqW_fd not in self._fileno_to_inq
         assert proc.inqW_fd not in self._all_inqueues