Forráskód Böngészése

Fixes file descriptor leak

Ask Solem 12 éve
szülő
commit
df545ea700
2 módosított fájl, 53 hozzáadás és 25 törlés
  1. 52 24
      celery/concurrency/processes.py
  2. 1 1
      funtests/stress/stress.py

+ 52 - 24
celery/concurrency/processes.py

@@ -259,21 +259,14 @@ class AsynPool(_pool.Pool):
                     if owner is None)
 
     def on_grow(self, n):
-        self._queues.update(
-            dict((self.create_process_queues(), None)
-                for _ in range(self._processes - len(self._queues)))
-        )
+        diff = max(self._processes - len(self._queues), 0)
+        if diff:
+            self._queues.update(
+                dict((self.create_process_queues(), None) for _ in range(diff))
+            )
 
     def on_shrink(self, n):
-        queues = self._queues
-        for i in range(n):
-            if len(queues) > self._processes:
-                try:
-                    queues.pop(next(
-                        q for q, owner in items(queues) if owner is None
-                    ), None)
-                except StopIteration:
-                    break
+        pass
 
     def create_process_queues(self):
         inq, outq, synq = _SimpleQueue(), _SimpleQueue(), None
@@ -311,7 +304,11 @@ class AsynPool(_pool.Pool):
     def _stop_task_handler(task_handler):
         for proc in task_handler.pool:
             proc.inq._writer.setblocking(1)
-            proc.inq.put(None)
+            try:
+                proc.inq.put(None)
+            except OSError as exc:
+                if get_errno(exc) != errno.EBADF:
+                    raise
 
     def create_result_handler(self):
         return super(AsynPool, self).create_result_handler(
@@ -320,7 +317,10 @@ class AsynPool(_pool.Pool):
         )
 
     def _process_register_queues(self, proc, queues):
+        assert queues in self._queues
+        b = len(self._queues)
         self._queues[queues] = proc
+        assert b == len(self._queues)
 
     def _find_worker_queues(self, proc):
         try:
@@ -349,15 +349,36 @@ class AsynPool(_pool.Pool):
             if not job._accepted:
                 self._put_back(job)
 
-            for conn in (proc.inq, proc.outq, proc.synq):
-                if conn:
-                    for sock in (conn._reader, conn._writer):
-                        if not sock.closed:
+            # Replace queues to avoid reuse
+            before = len(self._queues)
+            try:
+                queues = self._find_worker_queues(proc)
+                if self.destroy_queues(queues):
+                    self._queues[self.create_process_queues()] = None
+            except ValueError:
+                # Not in queue map, make sure sockets are closed.
+                self.destroy_queues((proc.inq, proc.outq, proc.synq))
+            assert len(self._queues) == before
+
+    def destroy_queues(self, queues):
+        removed = 1
+        try:
+            self._queues.pop(queues)
+        except KeyError:
+            removed = 0
+        try:
+            self.on_inqueue_close(queues[0]._writer.fileno())
+        except IOError:
+            pass
+        for queue in queues:
+            if queue:
+                for sock in (queue._reader, queue._writer):
+                    if not sock.closed:
+                        try:
                             sock.close()
-                            #os.close(sock.fileno())
-            self._queues.pop((proc.inq, proc.outq, proc.synq), None)
-            self._queues[self.create_process_queues()] = None
-            self.on_inqueue_close(proc.inqW_fd)
+                        except (IOError, OSError):
+                            pass
+        return removed
 
     @classmethod
     def _set_result_sentinel(cls, _outqueue, _pool):
@@ -372,8 +393,15 @@ class AsynPool(_pool.Pool):
         debug(
             'removing tasks from inqueue until task handler finished',
         )
-        fileno_to_proc = dict((w.inq._reader.fileno(), w) for w in pool)
-        inqR = set(fileno_to_proc)
+        fileno_to_proc = {}
+        inqR = set()
+        for w in pool:
+            try:
+                fd = w.inq._reader.fileno()
+                inqR.add(fd)
+                fileno_to_proc[fd] = w
+            except IOError:
+                pass
         while inqR:
             readable, _, again = _select(inqR, timeout=0.5)
             if again:

+ 1 - 1
funtests/stress/stress.py

@@ -103,9 +103,9 @@ class Stresstests(object):
                  self.termbysig,
                  self.bigtasks,
                  self.smalltasks,
-                 self.revoketermfast,
                  self.timelimits,
                  self.timelimits_soft,
+                 self.revoketermfast,
                  self.revoketermslow]
         for test in tests:
             self.runtest(test, n)