Explorar el Código

Merge branch 'master' of http://github.com/ask/celery

Gunnlaugur Thor Briem hace 15 años
padre
commit
7336e05f8b
Se han modificado 2 ficheros con 15 adiciones y 4 borrados
  1. 14 3
      celery/concurrency/processes/pool.py
  2. 1 1
      setup.py

+ 14 - 3
celery/concurrency/processes/pool.py

@@ -371,7 +371,10 @@ class ResultHandler(PoolThread):
                 return
 
             if putlock is not None:
-                putlock.release()
+                try:
+                    putlock.release()
+                except ValueError:
+                    pass
 
             if self._state:
                 assert self._state == TERMINATE
@@ -389,7 +392,10 @@ class ResultHandler(PoolThread):
                 pass
 
         if putlock is not None:
-            putlock.release()
+            try:
+                putlock.release()
+            except ValueError:
+                pass
 
         while cache and self._state != TERMINATE:
             try:
@@ -470,7 +476,7 @@ class Pool(object):
         self._worker_handler = self.Supervisor(self)
         self._worker_handler.start()
 
-        self._putlock = threading.Semaphore(self._processes)
+        self._putlock = threading.BoundedSemaphore(self._processes)
 
         self._task_handler = self.TaskHandler(self._taskqueue, self._quick_put,
                                          self._outqueue, self._pool)
@@ -529,6 +535,11 @@ class Pool(object):
             if worker.exitcode is not None:
                 # worker exited
                 debug('cleaning up worker %d' % i)
+                if self._putlock is not None:
+                    try:
+                        self._putlock.release()
+                    except ValueError:
+                        pass
                 worker.join()
                 del self._pool[i]
         return len(self._pool) < self._processes

+ 1 - 1
setup.py

@@ -68,7 +68,7 @@ setup(
              "bin/celeryev"],
     zip_safe=False,
     install_requires=install_requires,
-    tests_require=['nose-cover3', 'unittest2', 'simplejson'],
+    tests_require=['nose', 'nose-cover3', 'unittest2', 'simplejson'],
     cmdclass = {"quicktest": QuickRunTests},
     test_suite="nose.collector",
     classifiers=[