Ask Solem преди 11 години
родител
ревизия
06f3edadb5
променени са 4 файла, в които са добавени 28 реда и са изтрити 32 реда
  1. 28 24
      celery/concurrency/asynpool.py
  2. 0 3
      celery/concurrency/base.py
  3. 0 2
      celery/concurrency/prefork.py
  4. 0 3
      celery/tests/concurrency/test_concurrency.py

+ 28 - 24
celery/concurrency/asynpool.py

@@ -37,6 +37,7 @@ from billiard.pool import (
     WorkersJoined, CoroStop,
 )
 from billiard import pool as _pool
+from billiard.einfo import ExceptionInfo
 from billiard.queues import _SimpleQueue
 from kombu.async import READ, WRITE, ERR
 from kombu.serialization import pickle as _pickle
@@ -45,6 +46,7 @@ from kombu.utils.compat import get_errno
 from kombu.utils.eventio import SELECT_BAD_FD
 from celery.five import Counter, items, values
 from celery.utils.log import get_logger
+from celery.utils.text import truncate
 
 logger = get_logger(__name__)
 error, debug = logger.error, logger.debug
@@ -134,6 +136,10 @@ class Worker(_pool.Worker):
         # is writable.
         self.outq.put((WORKER_UP, (pid, )))
 
+    def prepare_result(self, result):
+        if not isinstance(result, ExceptionInfo):
+            return truncate(repr(result), 64)
+
 
 class ResultHandler(_pool.ResultHandler):
     """Handles messages from the pool processes."""
@@ -145,51 +151,45 @@ class ResultHandler(_pool.ResultHandler):
         # add our custom message handler
         self.state_handlers[WORKER_UP] = self.on_process_alive
 
-    def _process_result(self):
+    def _make_process_result(self, hub):
         """Coroutine that reads messages from the pool processes
         and calls the appropriate handler."""
         fileno_to_outq = self.fileno_to_outq
         on_state_change = self.on_state_change
+        hub_remove = hub.remove
 
-        while 1:
-            fileno = (yield)
+        def on_readable(fileno):
             try:
                 proc = fileno_to_outq[fileno]
             except KeyError:
+                hub_remove(fileno)
                 # process gone
-                continue
+                return
             reader = proc.outq._reader
 
             try:
-                if reader.poll(0):
-                    ready, message = True, reader.recv()
-                else:
-                    ready, message = False, None
+                message = reader.recv()
             except (IOError, EOFError) as exc:
                 debug('result handler got %r -- exiting', exc)
-                raise CoroStop()
+                hub_remove(fileno)
+                return
 
             if self._state:
                 assert self._state == TERMINATE
                 debug('result handler found thread._state==TERMINATE')
-                raise CoroStop()
+                return
 
-            if ready:
-                if message is None:
-                    debug('result handler got sentinel -- exiting')
-                    raise CoroStop()
-                on_state_change(message)
+            if message is None:
+                debug('result handler got sentinel -- exiting')
+                return
+            on_state_change(message)
+        return on_readable
+
+    def register_with_event_loop(self, hub):
+        self.handle_event = self._make_process_result(hub)
 
     def handle_event(self, fileno):
-        if self._state == RUN:
-            it = self._it
-            if it is None:
-                it = self._it = self._process_result()
-                next(it)
-            try:
-                it.send(fileno)
-            except (StopIteration, CoroStop):
-                self._it = None
+        raise RuntimeError('Not registered with event loop')
 
     def on_stop_not_started(self):
         """This method is always used to stop when the helper thread is not
@@ -298,6 +298,8 @@ class AsynPool(_pool.Pool):
 
     def register_with_event_loop(self, hub):
         """Registers the async pool with the current event loop."""
+        self._result_handler.register_with_event_loop(hub)
+        self.handle_result_event = self._result_handler.handle_event
         self._create_timelimit_handlers(hub)
         self._create_process_handlers(hub)
         self._create_write_handlers(hub)
@@ -510,6 +512,8 @@ class AsynPool(_pool.Pool):
             # the same fd receive every task if the pipe read buffer is not
             # full.
             if outbound:
+                #print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
+                #                              len(active_writes)))
                 inactive = diff(active_writes)
                 [hub_add(fd, None, WRITE | ERR, consolidate=True)
                  for fd in inactive]

+ 0 - 3
celery/concurrency/base.py

@@ -102,9 +102,6 @@ class BasePool(object):
     def on_hard_timeout(self, job):
         pass
 
-    def maybe_handle_result(self, *args):
-        pass
-
     def maintain_pool(self, *args, **kwargs):
         pass
 

+ 0 - 2
celery/concurrency/prefork.py

@@ -130,8 +130,6 @@ class TaskPool(BasePool):
         self.shrink = P.shrink
         self.flush = P.flush
         self.restart = P.restart
-        self.maybe_handle_result = P._result_handler.handle_event
-        self.handle_result_event = P.handle_result_event
 
     def did_start_ok(self):
         return self._pool.did_start_ok()

+ 0 - 3
celery/tests/concurrency/test_concurrency.py

@@ -101,9 +101,6 @@ class test_BasePool(AppCase):
     def test_interface_on_hard_timeout(self):
         self.assertIsNone(BasePool(10).on_hard_timeout(Mock()))
 
-    def test_interface_maybe_handle_result(self):
-        self.assertIsNone(BasePool(10).maybe_handle_result(1, 2))
-
     def test_interface_close(self):
         p = BasePool(10)
         p.on_close = Mock()