Sfoglia il codice sorgente

Be more selective about how file descriptors get removed from Kombu's hub.

Given that file descriptor changes appear to get triggered on the Kombu side, these changes
may not make a material impact.  However, to make things more consistent with the
changes introduced in https://github.com/celery/kombu/pull/353, the changes have been
updated here.

This change would also help allow refactoring for remove_reader()/remove_writer() to
be smarter about how file descriptors get managed in the future (i.e. using a counter
instead of removes() to avoid possible race conditions with file descriptors being reused)
Roger Hu 11 anni fa
parent
commit
04ddf8ea88
1 ha cambiato i file con 14 aggiunte e 13 eliminazioni
  1. 14 13
      celery/concurrency/asynpool.py

+ 14 - 13
celery/concurrency/asynpool.py

@@ -241,21 +241,21 @@ class ResultHandler(_pool.ResultHandler):
         fileno_to_outq = self.fileno_to_outq
         on_state_change = self.on_state_change
         add_reader = hub.add_reader
-        hub_remove = hub.remove
+        remove_reader = hub.remove_reader
         recv_message = self._recv_message
 
         def on_result_readable(fileno):
             try:
                 fileno_to_outq[fileno]
             except KeyError:  # process gone
-                return hub_remove(fileno)
+                return remove_reader(fileno)
             it = recv_message(add_reader, fileno, on_state_change)
             try:
                 next(it)
             except StopIteration:
                 pass
             except (IOError, OSError, EOFError):
-                hub_remove(fileno)
+                remove_reader(fileno)
             else:
                 add_reader(fileno, it)
         return on_result_readable
@@ -477,7 +477,7 @@ class AsynPool(_pool.Pool):
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
         """For async pool this will create the handlers called
         when a process is up/down and etc."""
-        add_reader, hub_remove = hub.add_reader, hub.remove
+        add_reader, remove_reader, remove_writer = hub.add_reader, hub.remove_reader, hub.remove_writer
         cache = self._cache
         all_inqueues = self._all_inqueues
         fileno_to_inq = self._fileno_to_inq
@@ -528,7 +528,7 @@ class AsynPool(_pool.Pool):
 
         self.on_process_up = on_process_up
 
-        def _remove_from_index(obj, proc, index, callback=None):
+        def _remove_from_index(obj, proc, index, remove_func, callback=None):
             # this remove the file descriptors for a process from
             # the indices.  we have to make sure we don't overwrite
             # another processes fds, as the fds may be reused.
@@ -544,7 +544,7 @@ class AsynPool(_pool.Pool):
             except KeyError:
                 pass
             else:
-                hub_remove(fd)
+                remove_func(fd)
                 if callback is not None:
                     callback(fd)
             return fd
@@ -554,23 +554,24 @@ class AsynPool(_pool.Pool):
             if proc.dead:
                 return
             process_flush_queues(proc)
-            _remove_from_index(proc.outq._reader, proc, fileno_to_outq)
+            _remove_from_index(proc.outq._reader, proc, fileno_to_outq, remove_func=remove_reader)
             if proc.synq:
-                _remove_from_index(proc.synq._writer, proc, fileno_to_synq)
+                _remove_from_index(proc.synq._writer, proc, fileno_to_synq, remove_func=remove_writer)
             inq = _remove_from_index(proc.inq._writer, proc, fileno_to_inq,
+                                     remove_func=remove_writer,
                                      callback=all_inqueues.discard)
             if inq:
                 busy_workers.discard(inq)
-            hub_remove(proc.sentinel)
+            remove_reader(proc.sentinel)
             waiting_to_start.discard(proc)
             self._active_writes.discard(proc.inqW_fd)
-            hub_remove(proc.inqW_fd)
-            hub_remove(proc.outqR_fd)
+            remove_writer(proc.inqW_fd)
+            remove_reader(proc.outqR_fd)
             if proc.synqR_fd:
-                hub_remove(proc.synqR_fd)
+                remove_reader(proc.synqR_fd)
             if proc.synqW_fd:
                 self._active_writes.discard(proc.synqW_fd)
-                hub_remove(proc.synqW_fd)
+                remove_reader(proc.synqW_fd)
         self.on_process_down = on_process_down
 
     def _create_write_handlers(self, hub,