فهرست منبع

[multisock] Child now sends ALIVE message back to worker, only then do we mark sockets as writable

Ask Solem 12 سال پیش
والد
کامیت
b9489fc1b1
1فایلهای تغییر یافته به همراه51 افزوده شده و 14 حذف شده
  1. 51 14
      celery/concurrency/processes.py

+ 51 - 14
celery/concurrency/processes.py

@@ -57,6 +57,8 @@ MAXTASKS_NO_BILLIARD = """\
     This may lead to a deadlock, please install the billiard C extension.
     This may lead to a deadlock, please install the billiard C extension.
 """
 """
 
 
+WORKER_UP = 15
+
 logger = get_logger(__name__)
 logger = get_logger(__name__)
 warning, debug = logger.warning, logger.debug
 warning, debug = logger.warning, logger.debug
 
 
@@ -131,11 +133,19 @@ class promise(object):
             self.ready = True
             self.ready = True
 
 
 
 
+class Worker(_pool.Worker):
+
+    def on_loop_start(self, pid):
+        self.outq.put((WORKER_UP, (pid, )))
+
+
 class ResultHandler(_pool.ResultHandler):
 class ResultHandler(_pool.ResultHandler):
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         self.fileno_to_outq = kwargs.pop('fileno_to_outq')
         self.fileno_to_outq = kwargs.pop('fileno_to_outq')
+        self.on_worker_alive = kwargs.pop('on_worker_alive')
         super(ResultHandler, self).__init__(*args, **kwargs)
         super(ResultHandler, self).__init__(*args, **kwargs)
+        self.state_handlers[WORKER_UP] = self.on_worker_alive
 
 
     def _process_result(self):
     def _process_result(self):
         fileno_to_outq = self.fileno_to_outq
         fileno_to_outq = self.fileno_to_outq
@@ -189,7 +199,12 @@ class ResultHandler(_pool.ResultHandler):
             if check_timeouts is not None:
             if check_timeouts is not None:
                 check_timeouts()
                 check_timeouts()
             for fd in outqueues:
             for fd in outqueues:
-                proc = fileno_to_outq[fd]
+                try:
+                    proc = fileno_to_outq[fd]
+                except KeyError:
+                    outqueues.discard(fd)
+                    continue
+
                 reader = proc.outq._reader
                 reader = proc.outq._reader
                 try:
                 try:
                     if reader.poll(0):
                     if reader.poll(0):
@@ -212,6 +227,7 @@ class ResultHandler(_pool.ResultHandler):
 
 
 class AsynPool(_pool.Pool):
 class AsynPool(_pool.Pool):
     ResultHandler = ResultHandler
     ResultHandler = ResultHandler
+    Worker = Worker
 
 
     def __init__(self, processes=None, *args, **kwargs):
     def __init__(self, processes=None, *args, **kwargs):
         processes = self.cpu_count() if processes is None else processes
         processes = self.cpu_count() if processes is None else processes
@@ -219,6 +235,7 @@ class AsynPool(_pool.Pool):
                                 for _ in range(processes))
                                 for _ in range(processes))
         self._fileno_to_inq = {}
         self._fileno_to_inq = {}
         self._fileno_to_outq = {}
         self._fileno_to_outq = {}
+        self._all_inqueues = set()
         super(AsynPool, self).__init__(processes, *args, **kwargs)
         super(AsynPool, self).__init__(processes, *args, **kwargs)
 
 
         for proc in self._pool:
         for proc in self._pool:
@@ -238,6 +255,14 @@ class AsynPool(_pool.Pool):
         inq._writer.setblocking(0)
         inq._writer.setblocking(0)
         return inq, outq
         return inq, outq
 
 
+    def on_worker_alive(self, pid):
+        try:
+            proc = next(w for w in self._pool if w.pid == pid)
+        except StopIteration:
+            return
+        self._fileno_to_inq[proc.inqW_fd] = proc
+        self._all_inqueues.add(proc.inqW_fd)
+
     def on_job_process_down(self, job):
     def on_job_process_down(self, job):
         if not job._accepted and job._write_to:
         if not job._accepted and job._write_to:
             self.on_partial_read(job, job._write_to)
             self.on_partial_read(job, job._write_to)
@@ -253,13 +278,27 @@ class AsynPool(_pool.Pool):
 
 
     @staticmethod
     @staticmethod
     def _stop_task_handler(task_handler):
     def _stop_task_handler(task_handler):
-        for worker in task_handler.pool:
-            # send sentinels
-            worker.inq.put(None)
+        fileno_to_inq = dict(
+            (w.inqW_fd, w.inq) for w in task_handler.pool
+        )
+        for proc in task_handler.pool:
+            proc.inq._writer.setblocking(1)
+            proc.inq.put(None)
+        #inqueues = set(fileno_to_inq)
+        #while inqueues:
+        #    _, writable, again = _select(inqueues, timeout=0.5)
+        #    print('INQUEUES: %r WRIT: %r' % (inqueues, writable))
+        #    if again:
+        #        continue
+        #    for fd in writable:
+        #        fileno_to_inq[fd].inq.put(None)
+        #        inqueues.discard(fd)
+        #    sleep(0)
 
 
     def create_result_handler(self):
     def create_result_handler(self):
         return super(AsynPool, self).create_result_handler(
         return super(AsynPool, self).create_result_handler(
             fileno_to_outq=self._fileno_to_outq,
             fileno_to_outq=self._fileno_to_outq,
+            on_worker_alive=self.on_worker_alive,
         )
         )
 
 
     def _process_register_queuepair(self, proc, pair):
     def _process_register_queuepair(self, proc, pair):
@@ -302,23 +341,24 @@ class AsynPool(_pool.Pool):
         pass
         pass
 
 
     def _help_stuff_finish_args(self):
     def _help_stuff_finish_args(self):
-        return (self._fileno_to_inq, )
+        return (self._pool, )
 
 
     @classmethod
     @classmethod
-    def _help_stuff_finish(cls, fileno_to_inq):
+    def _help_stuff_finish(cls, pool):
         # task_handler may be blocked trying to put items on inqueue
         # task_handler may be blocked trying to put items on inqueue
         debug(
         debug(
             'removing tasks from inqueue until task handler finished',
             'removing tasks from inqueue until task handler finished',
         )
         )
-        inqueues = set(fileno_to_inq)
-        while inqueues:
-            readable, _, again = _select(inqueues, timeout=0.5)
+        fileno_to_proc = dict((w.inq._reader.fileno(), w) for w in pool)
+        inqR = set(fileno_to_proc)
+        while inqR:
+            readable, _, again = _select(inqR, timeout=0.5)
             if again:
             if again:
                 continue
                 continue
             if not readable:
             if not readable:
                 break
                 break
             for fd in readable:
             for fd in readable:
-                fileno_to_inq[fd]._reader.recv()
+                fileno_to_proc[fd]._reader.recv()
             sleep(0)
             sleep(0)
 
 
 
 
@@ -360,7 +400,6 @@ class TaskPool(BasePool):
         self.maybe_handle_result = P._result_handler.handle_event
         self.maybe_handle_result = P._result_handler.handle_event
         self.outbound_buffer = deque()
         self.outbound_buffer = deque()
         self.handle_result_event = P.handle_result_event
         self.handle_result_event = P.handle_result_event
-        self._all_inqueues = set(p.inqW_fd for p in P._pool)
         self._active_writes = set()
         self._active_writes = set()
         self._active_writers = set()
         self._active_writers = set()
 
 
@@ -405,7 +444,7 @@ class TaskPool(BasePool):
         put_message = outbound.append
         put_message = outbound.append
         fileno_to_inq = pool._fileno_to_inq
         fileno_to_inq = pool._fileno_to_inq
         fileno_to_outq = pool._fileno_to_outq
         fileno_to_outq = pool._fileno_to_outq
-        all_inqueues = self._all_inqueues
+        all_inqueues = pool._all_inqueues
         active_writes = self._active_writes
         active_writes = self._active_writes
         add_coro = hub.add_coro
         add_coro = hub.add_coro
         diff = all_inqueues.difference
         diff = all_inqueues.difference
@@ -455,9 +494,7 @@ class TaskPool(BasePool):
         self._pool.on_timeout_cancel = on_timeout_cancel
         self._pool.on_timeout_cancel = on_timeout_cancel
 
 
         def on_process_up(proc):
         def on_process_up(proc):
-            fileno_to_inq[proc.inqW_fd] = proc
             fileno_to_outq[proc.outqR_fd] = proc
             fileno_to_outq[proc.outqR_fd] = proc
-            all_inqueues.add(proc.inqW_fd)
             hub_add(proc.sentinel, maintain_pool, READ | ERR)
             hub_add(proc.sentinel, maintain_pool, READ | ERR)
             hub_add(proc.outqR_fd, pool.handle_result_event, READ | ERR)
             hub_add(proc.outqR_fd, pool.handle_result_event, READ | ERR)
         self._pool.on_process_up = on_process_up
         self._pool.on_process_up = on_process_up