Преглед изворни кода

Be more selective about how file descriptors get removed from Kombu's hub.

Given that file descriptor changes appear to get triggered on the Kombu side, these changes
may not make a material impact.  However, to make things more consistent with the
changes introduced in https://github.com/celery/kombu/pull/353, the changes have been
updated here.

This change would also help allow refactoring for remove_reader()/remove_writer() to
be smarter about how file descriptors get managed in the future (i.e. using a counter
instead of removes() to avoid possible race conditions with file descriptors being reused)
Roger Hu пре 11 година
родитељ
комит
e3ba9aad11
1 измењених фајлова са 14 додато и 13 уклоњено
  1. 14 13
      celery/concurrency/asynpool.py

+ 14 - 13
celery/concurrency/asynpool.py

@@ -250,21 +250,21 @@ class ResultHandler(_pool.ResultHandler):
         fileno_to_outq = self.fileno_to_outq
         fileno_to_outq = self.fileno_to_outq
         on_state_change = self.on_state_change
         on_state_change = self.on_state_change
         add_reader = hub.add_reader
         add_reader = hub.add_reader
-        hub_remove = hub.remove
+        remove_reader = hub.remove_reader
         recv_message = self._recv_message
         recv_message = self._recv_message
 
 
         def on_result_readable(fileno):
         def on_result_readable(fileno):
             try:
             try:
                 fileno_to_outq[fileno]
                 fileno_to_outq[fileno]
             except KeyError:  # process gone
             except KeyError:  # process gone
-                return hub_remove(fileno)
+                return remove_reader(fileno)
             it = recv_message(add_reader, fileno, on_state_change)
             it = recv_message(add_reader, fileno, on_state_change)
             try:
             try:
                 next(it)
                 next(it)
             except StopIteration:
             except StopIteration:
                 pass
                 pass
             except (IOError, OSError, EOFError):
             except (IOError, OSError, EOFError):
-                hub_remove(fileno)
+                remove_reader(fileno)
             else:
             else:
                 add_reader(fileno, it)
                 add_reader(fileno, it)
         return on_result_readable
         return on_result_readable
@@ -485,7 +485,7 @@ class AsynPool(_pool.Pool):
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
         """For async pool this will create the handlers called
         """For async pool this will create the handlers called
         when a process is up/down and etc."""
         when a process is up/down and etc."""
-        add_reader, hub_remove = hub.add_reader, hub.remove
+        add_reader, remove_reader, remove_writer = hub.add_reader, hub.remove_reader, hub.remove_writer
         cache = self._cache
         cache = self._cache
         all_inqueues = self._all_inqueues
         all_inqueues = self._all_inqueues
         fileno_to_inq = self._fileno_to_inq
         fileno_to_inq = self._fileno_to_inq
@@ -536,7 +536,7 @@ class AsynPool(_pool.Pool):
 
 
         self.on_process_up = on_process_up
         self.on_process_up = on_process_up
 
 
-        def _remove_from_index(obj, proc, index, callback=None):
+        def _remove_from_index(obj, proc, index, remove_func, callback=None):
             # this remove the file descriptors for a process from
             # this remove the file descriptors for a process from
             # the indices.  we have to make sure we don't overwrite
             # the indices.  we have to make sure we don't overwrite
             # another processes fds, as the fds may be reused.
             # another processes fds, as the fds may be reused.
@@ -552,7 +552,7 @@ class AsynPool(_pool.Pool):
             except KeyError:
             except KeyError:
                 pass
                 pass
             else:
             else:
-                hub_remove(fd)
+                remove_func(fd)
                 if callback is not None:
                 if callback is not None:
                     callback(fd)
                     callback(fd)
             return fd
             return fd
@@ -562,23 +562,24 @@ class AsynPool(_pool.Pool):
             if proc.dead:
             if proc.dead:
                 return
                 return
             process_flush_queues(proc)
             process_flush_queues(proc)
-            _remove_from_index(proc.outq._reader, proc, fileno_to_outq)
+            _remove_from_index(proc.outq._reader, proc, fileno_to_outq, remove_func=remove_reader)
             if proc.synq:
             if proc.synq:
-                _remove_from_index(proc.synq._writer, proc, fileno_to_synq)
+                _remove_from_index(proc.synq._writer, proc, fileno_to_synq, remove_func=remove_writer)
             inq = _remove_from_index(proc.inq._writer, proc, fileno_to_inq,
             inq = _remove_from_index(proc.inq._writer, proc, fileno_to_inq,
+                                     remove_func=remove_writer,
                                      callback=all_inqueues.discard)
                                      callback=all_inqueues.discard)
             if inq:
             if inq:
                 busy_workers.discard(inq)
                 busy_workers.discard(inq)
-            hub_remove(proc.sentinel)
+            remove_reader(proc.sentinel)
             waiting_to_start.discard(proc)
             waiting_to_start.discard(proc)
             self._active_writes.discard(proc.inqW_fd)
             self._active_writes.discard(proc.inqW_fd)
-            hub_remove(proc.inqW_fd)
-            hub_remove(proc.outqR_fd)
+            remove_writer(proc.inqW_fd)
+            remove_reader(proc.outqR_fd)
             if proc.synqR_fd:
             if proc.synqR_fd:
-                hub_remove(proc.synqR_fd)
+                remove_reader(proc.synqR_fd)
             if proc.synqW_fd:
             if proc.synqW_fd:
                 self._active_writes.discard(proc.synqW_fd)
                 self._active_writes.discard(proc.synqW_fd)
-                hub_remove(proc.synqW_fd)
+                remove_reader(proc.synqW_fd)
         self.on_process_down = on_process_down
         self.on_process_down = on_process_down
 
 
     def _create_write_handlers(self, hub,
     def _create_write_handlers(self, hub,