Browse Source

Release put semaphore if supervisor finds dead process.

Ask Solem 14 years ago
parent
commit
a735fc022a
1 changed files with 14 additions and 3 deletions
  1. 14 3
      celery/concurrency/processes/pool.py

+ 14 - 3
celery/concurrency/processes/pool.py

@@ -371,7 +371,10 @@ class ResultHandler(PoolThread):
                 return
 
             if putlock is not None:
-                putlock.release()
+                try:
+                    putlock.release()
+                except ValueError:
+                    pass
 
             if self._state:
                 assert self._state == TERMINATE
@@ -389,7 +392,10 @@ class ResultHandler(PoolThread):
                 pass
 
         if putlock is not None:
-            putlock.release()
+            try:
+                putlock.release()
+            except ValueError:
+                pass
 
         while cache and self._state != TERMINATE:
             try:
@@ -470,7 +476,7 @@ class Pool(object):
         self._worker_handler = self.Supervisor(self)
         self._worker_handler.start()
 
-        self._putlock = threading.Semaphore(self._processes)
+        self._putlock = threading.BoundedSemaphore(self._processes)
 
         self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put,
                                          self._outqueue, self._pool)
@@ -529,6 +535,11 @@ class Pool(object):
             if worker.exitcode is not None:
                 # worker exited
                 debug('cleaning up worker %d' % i)
+                if self._putlock is not None:
+                    try:
+                        self._putlock.release()
+                    except ValueError:
+                        pass
                 worker.join()
                 del self._pool[i]
         return len(self._pool) < self._processes