Explorar el Código

LaxBoundedSemaphore: Does not care if released too many times. Issue #268

Ask Solem hace 14 años
padre
commit
f162dd2d63
Se han modificado 1 ficheros con 24 adiciones y 17 borrados
  1. 24 17
      celery/concurrency/processes/pool.py

+ 24 - 17
celery/concurrency/processes/pool.py

@@ -27,6 +27,8 @@ from multiprocessing.util import Finalize, debug
 from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
 from celery.exceptions import WorkerLostError
 
+_Semaphore = threading._Semaphore
+
 #
 # Constants representing the state of a pool
 #
@@ -55,6 +57,23 @@ job_counter = itertools.count()
 def mapstar(args):
     return map(*args)
 
+
+class LaxBoundedSemaphore(threading._Semaphore):
+    """Semaphore that checks that # release is <= # acquires,
+    but ignores if # releases >= value."""
+
+    def __init__(self, value=1, verbose=None):
+        _Semaphore.__init__(self, value, verbose)
+        self._initial_value = value
+
+    def release(self):
+        if self._Semaphore__value < self._initial_value:
+            return _Semaphore.release(self)
+        if __debug__:
+            self._note("%s.release: success, value=%s (unchanged)" % (
+                self, self._Semaphore__value))
+
+
 #
 # Code run by worker processes
 #
@@ -277,10 +296,7 @@ class TimeoutHandler(PoolThread):
             job._set(i, (False, TimeLimitExceeded()))
             # release sem
             if putlock is not None:
-                try:
-                    putlock.release()
-                except Exception:
-                    pass
+                putlock.release()
             # Remove from _pool
             process, _index = _process_by_pid(job._worker_pid)
             # Run timeout callback
@@ -344,10 +360,7 @@ class ResultHandler(PoolThread):
                 return
             if not item.ready():
                 if putlock is not None:
-                    try:
-                        putlock.release()
-                    except Exception:
-                        pass
+                    putlock.release()
             try:
                 item._set(i, obj)
             except KeyError:
@@ -384,10 +397,7 @@ class ResultHandler(PoolThread):
 
         # Notify waiting threads
         if putlock is not None:
-            try:
-                putlock.release()
-            except Exception:
-                pass
+            putlock.release()
 
         while cache and self._state != TERMINATE:
             try:
@@ -465,7 +475,7 @@ class Pool(object):
         self._worker_handler = self.Supervisor(self)
         self._worker_handler.start()
 
-        self._putlock = threading.BoundedSemaphore(self._processes)
+        self._putlock = LaxBoundedSemaphore(self._processes)
         self._task_handler = self.TaskHandler(self._taskqueue,
                                               self._quick_put,
                                               self._outqueue,
@@ -530,10 +540,7 @@ class Pool(object):
                 for worker_pid in job.worker_pids():
                     if worker_pid in cleaned and not job.ready():
                         if self._putlock is not None:
-                            try:
-                                self._putlock.release()
-                            except Exception:
-                                pass
+                            self._putlock.release()
                         err = WorkerLostError("Worker exited prematurely.")
                         job._set(None, (False, err))
                         continue