Ask Solem 12 years ago
parent
commit
7a5e341c73
3 changed files with 7 additions and 18 deletions
  1. 5 16
      celery/concurrency/processes.py
  2. 0 1
      celery/loaders/base.py
  3. 2 1
      funtests/stress/stress.py

+ 5 - 16
celery/concurrency/processes.py

@@ -57,6 +57,7 @@ MAXTASKS_NO_BILLIARD = """\
     This may lead to a deadlock, please install the billiard C extension.
 """
 
+#: Constant sent by child process when started (ready to accept work)
 WORKER_UP = 15
 
 logger = get_logger(__name__)
@@ -64,7 +65,7 @@ warning, debug = logger.warning, logger.debug
 
 
 def process_initializer(app, hostname):
-    """Initializes the process so it can be used to process tasks."""
+    """Pool child process initializer."""
     platforms.signals.reset(*WORKER_SIGRESET)
     platforms.signals.ignore(*WORKER_SIGIGNORE)
     platforms.set_mp_process_title('celeryd', hostname=hostname)
@@ -198,12 +199,13 @@ class ResultHandler(_pool.ResultHandler):
         while cache and outqueues and self._state != TERMINATE:
             if check_timeouts is not None:
                 check_timeouts()
+            _dirty = set()
             for fd in outqueues:
                 try:
                     proc = fileno_to_outq[fd]
                 except KeyError:
                     outqueues.discard(fd)
-                    continue
+                    break
 
                 reader = proc.outq._reader
                 try:
@@ -214,7 +216,7 @@ class ResultHandler(_pool.ResultHandler):
                         sleep(0.5)
                 except (IOError, EOFError):
                     outqueues.discard(fd)
-                    continue
+                    break
                 else:
                     if task:
                         on_state_change(task)
@@ -278,22 +280,9 @@ class AsynPool(_pool.Pool):
 
     @staticmethod
     def _stop_task_handler(task_handler):
-        fileno_to_inq = dict(
-            (w.inqW_fd, w.inq) for w in task_handler.pool
-        )
         for proc in task_handler.pool:
             proc.inq._writer.setblocking(1)
             proc.inq.put(None)
-        #inqueues = set(fileno_to_inq)
-        #while inqueues:
-        #    _, writable, again = _select(inqueues, timeout=0.5)
-        #    print('INQUEUES: %r WRIT: %r' % (inqueues, writable))
-        #    if again:
-        #        continue
-        #    for fd in writable:
-        #        fileno_to_inq[fd].inq.put(None)
-        #        inqueues.discard(fd)
-        #    sleep(0)
 
     def create_result_handler(self):
         return super(AsynPool, self).create_result_handler(

+ 0 - 1
celery/loaders/base.py

@@ -21,7 +21,6 @@ from kombu.utils import cached_property
 from kombu.utils.encoding import safe_str
 
 from celery.datastructures import DictAttribute
-from celery.exceptions import ImproperlyConfigured
 from celery.five import reraise, string_t
 from celery.utils.functional import maybe_list
 from celery.utils.imports import (

+ 2 - 1
funtests/stress/stress.py

@@ -99,7 +99,8 @@ class Stresstests(object):
                 raise
             finally:
                 print('{0} {1} iterations in {2}s'.format(
-                    'failed after' if failed else 'completed', i, time() - t
+                    'failed after' if failed else 'completed',
+                    i + 1, time() - t,
                 ))
 
     def termbysig(self):