Parcourir la source

fixes to multiple sockets

Ask Solem il y a 12 ans
Parent
commit
8463c5f3ad
1 fichiers modifiés avec 14 ajouts et 6 suppressions
  1. 14 6
      celery/concurrency/processes.py

+ 14 - 6
celery/concurrency/processes.py

@@ -133,13 +133,17 @@ class promise(object):
 
 class ResultHandler(_pool.ResultHandler):
 
+    def __init__(self, *args, **kwargs):
+        self.fileno_to_outq = kwargs.pop('fileno_to_outq')
+        super(ResultHandler, self).__init__(*args, **kwargs)
+
     def _process_result(self):
-        fileno_to_proc = self.fileno_to_proc
+        fileno_to_outq = self.fileno_to_outq
         on_state_change = self.on_state_change
 
         while 1:
             fileno = (yield)
-            proc = fileno_to_proc[fileno]
+            proc = fileno_to_outq[fileno]
             reader = proc.outq._reader
 
             try:
@@ -176,16 +180,16 @@ class ResultHandler(_pool.ResultHandler):
     def on_stop_not_started(self):
         cache = self.cache
         check_timeouts = self.check_timeouts
-        fileno_to_proc = self.fileno_to_proc
+        fileno_to_outq = self.fileno_to_outq
         on_state_change = self.on_state_change
         join_exited_workers = self.join_exited_workers
 
-        outqueues = set(fileno_to_proc)
+        outqueues = set(fileno_to_outq)
         while cache and outqueues and self._state != TERMINATE:
             if check_timeouts is not None:
                 check_timeouts()
             for fd in outqueues:
-                proc = fileno_to_proc[fd]
+                proc = fileno_to_outq[fd]
                 reader = proc.outq._reader
                 try:
                     if reader.poll(0):
@@ -217,6 +221,10 @@ class AsynPool(_pool.Pool):
         self._fileno_to_outq = {}
         super(AsynPool, self).__init__(processes, *args, **kwargs)
 
+        for proc in self._pool:
+            self._fileno_to_inq[proc.inqW_fd] = proc
+            self._fileno_to_outq[proc.outqR_fd] = proc
+
     def _finalize_args(self):
         orig = super(AsynPool, self)._finalize_args()
         return (self._fileno_to_inq, orig)
@@ -290,7 +298,7 @@ class AsynPool(_pool.Pool):
                 self._queuepairs[self.create_process_queuepair()] = None
 
     @classmethod
-    def _set_result_sentinel(cls, _outqueue, workers):
+    def _set_result_sentinel(cls, _outqueue, _pool):
         pass
 
     def _help_stuff_finish_args(self):