Parcourir la source

AsynPool: Make sure late terminated task is acked

Ask Solem il y a 11 ans
Parent
commit
df04a39e6a
1 fichiers modifiés avec 15 ajouts et 5 suppressions
  1. 15 5
      celery/concurrency/asynpool.py

+ 15 - 5
celery/concurrency/asynpool.py

@@ -250,10 +250,14 @@ class ResultHandler(_pool.ResultHandler):
                     # which means its outqueue has already been processed
                     # by the worker lost handler.
                     outqueues.discard(fd)
-                    break
+                    continue
 
                 reader = proc.outq._reader
-                setblocking(reader, 1)
+                try:
+                    setblocking(reader, 1)
+                except (OSError, IOError):
+                    outqueues.discard(fd)
+                    continue
                 try:
                     if reader.poll(0):
                         task = reader.recv()
@@ -262,12 +266,15 @@ class ResultHandler(_pool.ResultHandler):
                         sleep(0.5)
                 except (IOError, EOFError):
                     outqueues.discard(fd)
-                    break
+                    continue
                 else:
                     if task:
                         on_state_change(task)
                 finally:
-                    setblocking(reader, 0)
+                    try:
+                        setblocking(reader, 0)
+                    except (OSError, IOError):
+                        outqueues.discard(fd)
 
                 try:
                     join_exited_workers(shutdown=True)
@@ -538,14 +545,17 @@ class AsynPool(_pool.Pool):
         write_stats = self.write_stats
         is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
         revoked_tasks = worker_state.revoked
+        getpid = os.getpid
 
         precalc = {ACK: self._create_payload(ACK, (0, )),
                    NACK: self._create_payload(NACK, (0, ))}
 
-        def _put_back(job):
+        def _put_back(job, _time=time.time):
             # puts back at the end of the queue
             if job._terminated is not None or \
                     job.correlation_id in revoked_tasks:
+                if not job._accepted:
+                    job._ack(None, _time(), getpid(), None)
                 job._set_terminated(job._terminated)
             else:
                 # XXX linear lookup, should find a better way,