Просмотр исходного кода

multiprocessing.Pool: Fixes race condition with WorkerLostError.

The process may have published a result before it was terminated,
but we have no reliable way to detect that this is the case.

So we have to wait for 10 seconds before marking the result with
WorkerLostError.  This gives the result handler a chance to retrieve the
result.  Downside is that it takes ~10 seconds for a job to be marked
with WorkerLostError, but I think it's a compromise we can live with,
at least it decreases the chance of having false negatives.

Closes #268.  Thanks to dialtone.
Ask Solem 14 лет назад
Родитель
Сommit
71c2f5499a
1 измененных файлов с 44 добавлено и 22 удалено
  1. 44 22
      celery/concurrency/processes/pool.py

+ 44 - 22
celery/concurrency/processes/pool.py

@@ -200,7 +200,7 @@ class Supervisor(PoolThread):
         debug('worker handler starting')
         debug('worker handler starting')
         while self._state == RUN and self.pool._state == RUN:
         while self._state == RUN and self.pool._state == RUN:
             self.pool._maintain_pool()
             self.pool._maintain_pool()
-            time.sleep(0.1)
+            time.sleep(0.8)
         debug('worker handler exiting')
         debug('worker handler exiting')
 
 
 
 
@@ -536,11 +536,22 @@ class Pool(object):
         w.start()
         w.start()
         return w
         return w
 
 
-    def _join_exited_workers(self):
+    def _join_exited_workers(self, lost_worker_timeout=10.0):
         """Cleanup after any worker processes which have exited due to
         """Cleanup after any worker processes which have exited due to
         reaching their specified lifetime. Returns True if any workers were
         reaching their specified lifetime. Returns True if any workers were
         cleaned up.
         cleaned up.
         """
         """
+        now = None
+        # The worker may have published a result before being terminated,
+        # but we have no way to accurately tell if it did.  So we wait for
+        # 10 seconds before we mark the job with WorkerLostError.
+        for job in [job for job in self._cache.values()
+                if not job.ready() and job._worker_lost]:
+            now = now or time.time()
+            if now - job._worker_lost > lost_worker_timeout:
+                err = WorkerLostError("Worker exited prematurely.")
+                job._set(None, (False, err))
+
         cleaned = []
         cleaned = []
         for i in reversed(range(len(self._pool))):
         for i in reversed(range(len(self._pool))):
             worker = self._pool[i]
             worker = self._pool[i]
@@ -556,8 +567,7 @@ class Pool(object):
                     if worker_pid in cleaned and not job.ready():
                     if worker_pid in cleaned and not job.ready():
                         if self._putlock is not None:
                         if self._putlock is not None:
                             self._putlock.release()
                             self._putlock.release()
-                        err = WorkerLostError("Worker exited prematurely.")
-                        job._set(None, (False, err))
+                        job._worker_lost = time.time()
                         continue
                         continue
             return True
             return True
         return False
         return False
@@ -832,9 +842,11 @@ DynamicPool = Pool
 
 
 
 
 class ApplyResult(object):
 class ApplyResult(object):
+    _worker_lost = None
 
 
     def __init__(self, cache, callback, accept_callback=None,
     def __init__(self, cache, callback, accept_callback=None,
             timeout_callback=None, error_callback=None):
             timeout_callback=None, error_callback=None):
+        self._mutex = threading.Lock()
         self._cond = threading.Condition(threading.Lock())
         self._cond = threading.Condition(threading.Lock())
         self._job = job_counter.next()
         self._job = job_counter.next()
         self._cache = cache
         self._cache = cache
@@ -880,28 +892,38 @@ class ApplyResult(object):
             raise self._value
             raise self._value
 
 
     def _set(self, i, obj):
     def _set(self, i, obj):
-        self._success, self._value = obj
-        if self._callback and self._success:
-            self._callback(self._value)
-        if self._errback and not self._success:
-            self._errback(self._value)
-        self._cond.acquire()
+        self._mutex.acquire()
         try:
         try:
-            self._ready = True
-            self._cond.notify()
+            self._success, self._value = obj
+            self._cond.acquire()
+            try:
+                self._ready = True
+                self._cond.notify()
+            finally:
+                self._cond.release()
+            if self._accepted:
+                self._cache.pop(self._job, None)
+
+            # apply callbacks last
+            if self._callback and self._success:
+                self._callback(self._value)
+            if self._errback and not self._success:
+                self._errback(self._value)
         finally:
         finally:
-            self._cond.release()
-        if self._accepted:
-            self._cache.pop(self._job, None)
+            self._mutex.release()
 
 
     def _ack(self, i, time_accepted, pid):
     def _ack(self, i, time_accepted, pid):
-        self._accepted = True
-        self._time_accepted = time_accepted
-        self._worker_pid = pid
-        if self._accept_callback:
-            self._accept_callback(pid, time_accepted)
-        if self._ready:
-            self._cache.pop(self._job, None)
+        self._mutex.acquire()
+        try:
+            self._accepted = True
+            self._time_accepted = time_accepted
+            self._worker_pid = pid
+            if self._ready:
+                self._cache.pop(self._job, None)
+            if self._accept_callback:
+                self._accept_callback(pid, time_accepted)
+        finally:
+            self._mutex.release()
 
 
 #
 #
 # Class whose instances are returned by `Pool.map_async()`
 # Class whose instances are returned by `Pool.map_async()`