Browse Source

[Asynpool] Fixes 100% CPU loop in epoll (round 2)

The billiard patch celery/billiard@4f4759b8a92c117b2694faa18f1f6d6108065773
was initially submitted to fix this problem, but on closer investigation
we were only leaking file descriptors.

I monkey patched os.open/socket/os.close etc to track what was going on,
and I found no evidence of the code closing random sockets,
instead I found out that:

1) epoll_wait always returned an error state for a Popen pipe fd.
2) the worker was trying to unregister this fd from epoll, but
3) ``epoll.unregister`` refused to do so giving an IOError(ENOENT)
   error.

So turns out this is an epoll quirk, and the solution is to duplicate the pipe fd
so that we can carefully control when it's removed from the process
file descriptor table.

Closes celery/celery#1845
Could fix: celery/celery#2142 celery/celery#2606
Ask Solem 9 years ago
parent
commit
ca57e722b2
1 changed files with 27 additions and 13 deletions
  1. 27 13
      celery/concurrency/asynpool.py

+ 27 - 13
celery/concurrency/asynpool.py

@@ -426,11 +426,28 @@ class AsynPool(_pool.Pool):
             self._timeout_handler, 'on_hard_timeout', noop,
         )
 
-    def _event_process_exit(self, hub, fd):
+    def _event_process_exit(self, hub, proc):
         # This method is called whenever the process sentinel is readable.
-        hub.remove(fd)
+        self._untrack_child_process(proc, hub)
         self.maintain_pool()
 
+    def _track_child_process(self, proc, hub):
+        try:
+            fd = proc._sentinel_poll
+        except AttributeError:
+            # we need to duplicate the fd here to carefully
+            # control when the fd is removed from the process table,
+            # as once the original fd is closed we cannot unregister
+            # the fd from epoll(7) anymore, causing a 100% CPU poll loop.
+            fd = proc._sentinel_poll = os.dup(proc._popen.sentinel)
+        hub.add_reader(fd, self._event_process_exit, hub, proc)
+
+    def _untrack_child_process(self, proc, hub):
+        if proc._sentinel_poll is not None:
+            fd, proc._sentinel_poll = proc._sentinel_poll, None
+            hub.remove(fd)
+            os.close(fd)
+
     def register_with_event_loop(self, hub):
         """Registers the async pool with the current event loop."""
         self._result_handler.register_with_event_loop(hub)
@@ -440,8 +457,7 @@ class AsynPool(_pool.Pool):
         self._create_write_handlers(hub)
 
         # Add handler for when a process exits (calls maintain_pool)
-        [hub.add_reader(fd, self._event_process_exit, hub, fd)
-         for fd in self.process_sentinels]
+        [self._track_child_process(w, hub) for w in self._pool]
         # Handle_result_event is called whenever one of the
         # result queues are readable.
         [hub.add_reader(fd, self.handle_result_event, fd)
@@ -528,7 +544,6 @@ class AsynPool(_pool.Pool):
         fileno_to_outq = self._fileno_to_outq
         fileno_to_synq = self._fileno_to_synq
         busy_workers = self._busy_workers
-        event_process_exit = self._event_process_exit
         handle_result_event = self.handle_result_event
         process_flush_queues = self.process_flush_queues
         waiting_to_start = self._waiting_to_start
@@ -554,10 +569,9 @@ class AsynPool(_pool.Pool):
                 if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
                     job._scheduled_for = proc
             fileno_to_outq[proc.outqR_fd] = proc
+
             # maintain_pool is called whenever a process exits.
-            add_reader(
-                proc.sentinel, event_process_exit, hub, proc.sentinel,
-            )
+            self._track_child_process(proc, hub)
 
             assert not isblocking(proc.outq._reader)
 
@@ -611,16 +625,16 @@ class AsynPool(_pool.Pool):
             )
             if inq:
                 busy_workers.discard(inq)
-            remove_reader(proc.sentinel)
+            self._untrack_child_process(proc, hub)
             waiting_to_start.discard(proc)
             self._active_writes.discard(proc.inqW_fd)
-            remove_writer(proc.inqW_fd)
-            remove_reader(proc.outqR_fd)
+            remove_writer(proc.inq._writer)
+            remove_reader(proc.outq._reader)
             if proc.synqR_fd:
-                remove_reader(proc.synqR_fd)
+                remove_reader(proc.synq._reader)
             if proc.synqW_fd:
                 self._active_writes.discard(proc.synqW_fd)
-                remove_reader(proc.synqW_fd)
+                remove_reader(proc.synq._writer)
         self.on_process_down = on_process_down
 
     def _create_write_handlers(self, hub,