Prechádzať zdrojové kódy

Previous commit was totally borked (tested on wrong install :/)

Ask Solem 14 rokov pred
rodič
commit
29e5410ad5
1 zmenil súbory, kde vykonal 25 pridanie a 11 odobranie
  1. 25 11
      celery/concurrency/processes/pool.py

+ 25 - 11
celery/concurrency/processes/pool.py

@@ -222,16 +222,18 @@ class TaskHandler(PoolThread):
 
 class TimeoutHandler(PoolThread):
 
-    def __init__(self, processes, cache, t_soft, t_hard):
+    def __init__(self, processes, cache, t_soft, t_hard, putlock):
         self.processes = processes
         self.cache = cache
         self.t_soft = t_soft
         self.t_hard = t_hard
+        self.putlock = putlock
         super(TimeoutHandler, self).__init__()
 
     def run(self):
         processes = self.processes
         cache = self.cache
+        putlock = self.putlock
         t_hard, t_soft = self.t_hard, self.t_soft
         dirty = set()
 
@@ -268,11 +270,19 @@ class TimeoutHandler(PoolThread):
             dirty.add(i)
 
         def _on_hard_timeout(job, i):
+            if job.ready():
+                return
             debug('hard time limit exceeded for %i', i)
-            # Remove from _pool
-            process, _index = _process_by_pid(job._worker_pid)
             # Remove from cache and set return value to an exception
             job._set(i, (False, TimeLimitExceeded()))
+            # release sem
+            if putlock is not None:
+                try:
+                    putlock.release()
+                except Exception:
+                    pass
+            # Remove from _pool
+            process, _index = _process_by_pid(job._worker_pid)
             # Run timeout callback
             if job._timeout_callback is not None:
                 job._timeout_callback(soft=False)
@@ -328,14 +338,18 @@ class ResultHandler(PoolThread):
                 pass
 
         def on_ready(job, i, obj):
-            if not job.ready():
+            try:
+                item = cache[job]
+            except KeyError:
+                return
+            if not item.ready():
                 if putlock is not None:
                     try:
                         putlock.release()
                     except Exception:
-                        paSs
+                        pass
             try:
-                cache[job]._set(i, obj)
+                item._set(i, obj)
             except KeyError:
                 pass
 
@@ -462,7 +476,7 @@ class Pool(object):
         if self.timeout or self.soft_timeout:
             self._timeout_handler = self.TimeoutHandler(
                     self._pool, self._cache,
-                    self.soft_timeout, self.timeout)
+                    self.soft_timeout, self.timeout, self._putlock)
             self._timeout_handler.start()
         else:
             self._timeout_handler = None
@@ -516,10 +530,10 @@ class Pool(object):
                 for worker_pid in job.worker_pids():
                     if worker_pid in cleaned and not job.ready():
                         if self._putlock is not None:
-                        try:
-                            self._putlock.release()
-                        except Exception:
-                            pass
+                            try:
+                                self._putlock.release()
+                            except Exception:
+                                pass
                         err = WorkerLostError("Worker exited prematurely.")
                         job._set(None, (False, err))
                         continue