瀏覽代碼

pool: Only release semaphore if result not ready

Ask Solem 14 年之前
父節點
當前提交
50f27af8f8
共有 1 個文件被更改,包括 13 次插入13 次删除
  1. 13 13
      celery/concurrency/processes/pool.py

+ 13 - 13
celery/concurrency/processes/pool.py

@@ -328,11 +328,12 @@ class ResultHandler(PoolThread):
                 pass
 
         def on_ready(job, i, obj):
-            if putlock is not None:
-                try:
-                    putlock.release()
-                except Exception:
-                    pass
+            if not job.ready():
+                if putlock is not None:
+                    try:
+                        putlock.release()
+                    except Exception:
+                        paSs
             try:
                 cache[job]._set(i, obj)
             except KeyError:
@@ -450,8 +451,7 @@ class Pool(object):
         self._worker_handler = self.Supervisor(self)
         self._worker_handler.start()
 
-        self._putlock = threading.BoundedSemaphore(self._processes)
-
+        self._putlock = threading.BoundedSemaphore(self._processes, verbose=True)
         self._task_handler = self.TaskHandler(self._taskqueue,
                                               self._quick_put,
                                               self._outqueue,
@@ -508,18 +508,18 @@ class Pool(object):
             if worker.exitcode is not None:
                 # worker exited
                 debug('cleaning up worker %d' % i)
-                if self._putlock is not None:
-                    try:
-                        self._putlock.release()
-                    except ValueError:
-                        pass
                 worker.join()
                 cleaned.append(worker.pid)
                 del self._pool[i]
         if cleaned:
             for job in self._cache.values():
                 for worker_pid in job.worker_pids():
-                    if worker_pid in cleaned:
+                    if worker_pid in cleaned and not job.ready():
+                        if self._putlock is not None:
+                        try:
+                            self._putlock.release()
+                        except Exception:
+                            pass
                         err = WorkerLostError("Worker exited prematurely.")
                         job._set(None, (False, err))
                         continue