Browse Source

Revert pool changes as they may not be fully stable yet.

Ask Solem 15 years ago
parent
commit
36205359a8
1 changed files with 15 additions and 61 deletions
  1. 15 61
      celery/concurrency/processes/pool.py

+ 15 - 61
celery/concurrency/processes/pool.py

@@ -25,7 +25,6 @@ from multiprocessing import Process, cpu_count, TimeoutError
 from multiprocessing.util import Finalize, debug
 from multiprocessing.util import Finalize, debug
 
 
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
-from celery.exceptions import WorkerLostError
 
 
 #
 #
 # Constants representing the state of a pool
 # Constants representing the state of a pool
@@ -107,13 +106,6 @@ def worker(inqueue, outqueue, ackqueue, initializer=None, initargs=(),
             result = (True, func(*args, **kwds))
             result = (True, func(*args, **kwds))
         except Exception, e:
         except Exception, e:
             result = (False, e)
             result = (False, e)
-        except BaseException, e:
-            # Job raised SystemExit or equivalent, so tell the result
-            # handler and exit the process so it can be replaced.
-            err = WorkerLostError(
-                    "Worker has terminated by user request: %r" % (e, ))
-            put((job, i, (False, err)))
-            raise
         try:
         try:
             put((job, i, result))
             put((job, i, result))
         except Exception, exc:
         except Exception, exc:
@@ -227,7 +219,6 @@ class AckHandler(PoolThread):
             except (IOError, EOFError), exc:
             except (IOError, EOFError), exc:
                 debug('ack handler got %s -- exiting',
                 debug('ack handler got %s -- exiting',
                         exc.__class__.__name__)
                         exc.__class__.__name__)
-                return
 
 
             if self._state:
             if self._state:
                 assert self._state == TERMINATE
                 assert self._state == TERMINATE
@@ -241,7 +232,7 @@ class AckHandler(PoolThread):
             job, i, time_accepted, pid = task
             job, i, time_accepted, pid = task
             try:
             try:
                 cache[job]._ack(i, time_accepted, pid)
                 cache[job]._ack(i, time_accepted, pid)
-            except (KeyError, AttributeError):
+            except (KeyError, AttributeError), exc:
                 # Object gone, or doesn't support _ack (e.g. IMapIterator)
                 # Object gone, or doesn't support _ack (e.g. IMapIterator)
                 pass
                 pass
 
 
@@ -254,13 +245,13 @@ class AckHandler(PoolThread):
                 return
                 return
 
 
             if task is None:
             if task is None:
-                debug('ack handler ignoring extra sentinel')
+                debug('result handler ignoring extra sentinel')
                 continue
                 continue
 
 
             job, i, time_accepted, pid = task
             job, i, time_accepted, pid = task
             try:
             try:
                 cache[job]._ack(i, time_accepted, pid)
                 cache[job]._ack(i, time_accepted, pid)
-            except (KeyError, AttributeError):
+            except KeyError:
                 pass
                 pass
 
 
         debug('ack handler exiting: len(cache)=%s, thread._state=%s',
         debug('ack handler exiting: len(cache)=%s, thread._state=%s',
@@ -357,13 +348,11 @@ class TimeoutHandler(PoolThread):
 
 
 class ResultHandler(PoolThread):
 class ResultHandler(PoolThread):
 
 
-    def __init__(self, outqueue, get, cache, putlock, poll, workers_gone):
+    def __init__(self, outqueue, get, cache, putlock):
         self.outqueue = outqueue
         self.outqueue = outqueue
         self.get = get
         self.get = get
         self.cache = cache
         self.cache = cache
         self.putlock = putlock
         self.putlock = putlock
-        self.poll = poll
-        self.workers_gone = workers_gone
         super(ResultHandler, self).__init__()
         super(ResultHandler, self).__init__()
 
 
     def run(self):
     def run(self):
@@ -371,8 +360,6 @@ class ResultHandler(PoolThread):
         outqueue = self.outqueue
         outqueue = self.outqueue
         cache = self.cache
         cache = self.cache
         putlock = self.putlock
         putlock = self.putlock
-        poll = self.poll
-        workers_gone = self.workers_gone
 
 
         debug('result handler starting')
         debug('result handler starting')
         while 1:
         while 1:
@@ -412,30 +399,20 @@ class ResultHandler(PoolThread):
 
 
         while cache and self._state != TERMINATE:
         while cache and self._state != TERMINATE:
             try:
             try:
-                ready, task = poll(0.2)
+                task = get()
             except (IOError, EOFError), exc:
             except (IOError, EOFError), exc:
                 debug('result handler got %s -- exiting',
                 debug('result handler got %s -- exiting',
                         exc.__class__.__name__)
                         exc.__class__.__name__)
                 return
                 return
 
 
-            if ready:
-                if task is None:
-                    debug('result handler ignoring extra sentinel')
-                    continue
-
-                job, i, obj = task
-                try:
-                    cache[job]._set(i, obj)
-                except KeyError:
-                    pass
-            else:
-                if workers_gone():
-                    debug("%s active job(s), but no active workers! "
-                          "Terminating..." % (len(cache), ))
-                    err = WorkerLostError(
-                            "The worker processing this job has terminated.")
-                    for job in cache.values():
-                        job._set(None, (False, err))
+            if task is None:
+                debug('result handler ignoring extra sentinel')
+                continue
+            job, i, obj = task
+            try:
+                cache[job]._set(i, obj)
+            except KeyError:
+                pass
 
 
         if hasattr(outqueue, '_reader'):
         if hasattr(outqueue, '_reader'):
             debug('ensuring that outqueue is not full')
             debug('ensuring that outqueue is not full')
@@ -522,8 +499,7 @@ class Pool(object):
         # Thread processing results in the outqueue.
         # Thread processing results in the outqueue.
         self._result_handler = self.ResultHandler(self._outqueue,
         self._result_handler = self.ResultHandler(self._outqueue,
                                         self._quick_get, self._cache,
                                         self._quick_get, self._cache,
-                                        self._putlock, self._poll_result,
-                                        self._workers_gone)
+                                        self._putlock)
         self._result_handler.start()
         self._result_handler.start()
 
 
         self._terminate = Finalize(
         self._terminate = Finalize(
@@ -549,10 +525,6 @@ class Pool(object):
         w.start()
         w.start()
         return w
         return w
 
 
-    def _workers_gone(self):
-        self._join_exited_workers()
-        return not self._pool
-
     def _join_exited_workers(self):
     def _join_exited_workers(self):
         """Cleanup after any worker processes which have exited due to
         """Cleanup after any worker processes which have exited due to
         reaching their specified lifetime. Returns True if any workers were
         reaching their specified lifetime. Returns True if any workers were
@@ -596,13 +568,6 @@ class Pool(object):
         self._quick_get = self._outqueue._reader.recv
         self._quick_get = self._outqueue._reader.recv
         self._quick_get_ack = self._ackqueue._reader.recv
         self._quick_get_ack = self._ackqueue._reader.recv
 
 
-        def _poll_result(timeout):
-            if self._outqueue._reader.poll(timeout):
-                return True, self._quick_get()
-            return False, None
-
-        self._poll_result = _poll_result
-
     def apply(self, func, args=(), kwds={}):
     def apply(self, func, args=(), kwds={}):
         '''
         '''
         Equivalent of `apply()` builtin
         Equivalent of `apply()` builtin
@@ -719,11 +684,8 @@ class Pool(object):
     def close(self):
     def close(self):
         debug('closing pool')
         debug('closing pool')
         if self._state == RUN:
         if self._state == RUN:
-            # Worker handler can't run while the result
-            # handler does its second pass, so wait for it to finish.
-            self._worker_handler.close()
-            self._worker_handler.join()
             self._state = CLOSE
             self._state = CLOSE
+            self._worker_handler.close()
             self._taskqueue.put(None)
             self._taskqueue.put(None)
 
 
     def terminate(self):
     def terminate(self):
@@ -1032,14 +994,6 @@ class ThreadPool(Pool):
         self._quick_get = self._outqueue.get
         self._quick_get = self._outqueue.get
         self._quick_get_ack = self._ackqueue.get
         self._quick_get_ack = self._ackqueue.get
 
 
-        def _poll_result(timeout):
-            try:
-                return True, self._quick_get(timeout=timeout)
-            except Queue.Empty:
-                return False, None
-
-        self._poll_result = _poll_result
-
     @staticmethod
     @staticmethod
     def _help_stuff_finish(inqueue, task_handler, size):
     def _help_stuff_finish(inqueue, task_handler, size):
         # put sentinels at head of inqueue to make workers finish
         # put sentinels at head of inqueue to make workers finish