Browse Source

multisock: work with refactored billiard master

Ask Solem 12 years ago
parent
commit
43fda6fa13
1 changed files with 70 additions and 3 deletions
  1. 70 3
      celery/concurrency/processes.py

+ 70 - 3
celery/concurrency/processes.py

@@ -24,7 +24,7 @@ from time import sleep, time
 from billiard import forking_enable
 from billiard import pool as _pool
 from billiard.exceptions import WorkerLostError
-from billiard.pool import RUN, CLOSE, TERMINATE, WorkersJoined
+from billiard.pool import RUN, CLOSE, TERMINATE, WorkersJoined, CoroStop
 from billiard.queues import _SimpleQueue
 from kombu.serialization import pickle as _pickle
 from kombu.utils import fxrange
@@ -133,6 +133,46 @@ class promise(object):
 
 class ResultHandler(_pool.ResultHandler):
 
+    def _process_result(self):
+        fileno_to_proc = self.fileno_to_proc
+        on_state_change = self.on_state_change
+
+        while 1:
+            fileno = (yield)
+            proc = fileno_to_proc[fileno]
+            reader = proc.outq._reader
+
+            try:
+                if reader.poll(0):
+                    ready, task = True, reader.recv()
+                else:
+                    ready, task = False, None
+            except (IOError, EOFError) as exc:
+                debug('result handler got %r -- exiting' % (exc, ))
+                raise CoroStop()
+
+            if self._state:
+                assert self._state == TERMINATE
+                debug('result handler found thread._state==TERMINATE')
+                raise CoroStop()
+
+            if ready:
+                if task is None:
+                    debug('result handler got sentinel -- exiting')
+                    raise CoroStop()
+                on_state_change(task)
+
+    def handle_event(self, fileno=None, event=None):
+        if self._state == RUN:
+            it = self._it
+            if it is None:
+                it = self._it = self._process_result()
+                next(it)
+            try:
+                it.send(fileno)
+            except (StopIteration, CoroStop):
+                self._it = None
+
     def on_stop_not_started(self):
         cache = self.cache
         check_timeouts = self.check_timeouts
@@ -173,14 +213,29 @@ class AsynPool(_pool.Pool):
         processes = self.cpu_count() if processes is None else processes
         self._queuepairs = dict((self.create_process_queuepair(), None)
                                 for _ in range(processes))
+        self._fileno_to_inq = {}
+        self._fileno_to_outq = {}
         super(AsynPool, self).__init__(processes, *args, **kwargs)
 
+    def _finalize_args(self):
+        orig = super(AsynPool, self)._finalize_args()
+        return (self._fileno_to_inq, orig)
+
     def get_process_queuepair(self):
         return next(pair for pair, owner in items(self._queuepairs)
                     if owner is None)
 
     def create_process_queuepair(self):
-        return _SimpleQueue(), _SimpleQueue()
+        inq, outq = _SimpleQueue(), _SimpleQueue()
+        inq._writer.setblocking(0)
+        return inq, outq
+
+    def on_job_process_down(self, job):
+        if not job._accepted and job._write_to:
+            self.on_partial_reaad(job, job._write_to)
+
+    def on_job_process_lost(self, job, pid, exitcode):
+        self.mark_as_worker_lost(job, exitcode)
 
     def _process_cleanup_queuepair(self, proc):
         try:
@@ -194,6 +249,11 @@ class AsynPool(_pool.Pool):
             # send sentinels
             worker.inq.put(None)
 
+    def create_result_handler(self):
+        return super(AsynPool, self).create_result_handler(
+            fileno_to_outq=self._fileno_to_outq,
+        )
+
     def _process_register_queuepair(self, proc, pair):
         self._queuepairs[pair] = proc
 
@@ -233,8 +293,11 @@ class AsynPool(_pool.Pool):
     def _set_result_sentinel(cls, _outqueue, workers):
         pass
 
+    def _help_stuff_finish_args(self):
+        return (self._fileno_to_inq, )
+
     @classmethod
-    def _help_stuff_finish(cls, _inqueue, _taskhandler, _size, fileno_to_inq):
+    def _help_stuff_finish(cls, fileno_to_inq):
         # task_handler may be blocked trying to put items on inqueue
         debug(
             'removing tasks from inqueue until task handler finished',
@@ -382,12 +445,16 @@ class TaskPool(BasePool):
         self._pool.on_timeout_cancel = on_timeout_cancel
 
         def on_process_up(proc):
+            fileno_to_inq[proc.inqW_fd] = proc
+            fileno_to_outq[proc.outqR_fd] = proc
             all_inqueues.add(proc.inqW_fd)
             hub_add(proc.sentinel, maintain_pool, READ | ERR)
             hub_add(proc.outqR_fd, pool.handle_result_event, READ | ERR)
         self._pool.on_process_up = on_process_up
 
         def on_process_down(proc):
+            fileno_to_outq.pop(proc.outqR_fd, None)
+            fileno_to_inq.pop(proc.inqW_fd, None)
             all_inqueues.discard(proc.inqW_fd)
             hub_remove(proc.sentinel)
             hub_remove(proc.outqR_fd)