Parcourir la source

Fixes race conditions

Ask Solem il y a 12 ans
Parent
commit
2b30a673f2
4 fichiers modifiés avec 41 ajouts et 20 suppressions
  1. 26 11
      celery/concurrency/processes.py
  2. 2 2
      celery/worker/hub.py
  3. 9 6
      celery/worker/loops.py
  4. 4 1
      funtests/stress/stress.py

+ 26 - 11
celery/concurrency/processes.py

@@ -275,9 +275,11 @@ class AsynPool(_pool.Pool):
         self._fileno_to_synq[proc.synqW_fd] = proc
         self._all_inqueues.add(proc.inqW_fd)
 
-    def on_job_process_down(self, job):
+    def on_job_process_down(self, job, pid_gone):
         if job._write_to:
             self.on_partial_read(job, job._write_to)
+        elif job._scheduled_for:
+            self._put_back(job)
 
     def on_job_process_lost(self, job, pid, exitcode):
         self.mark_as_worker_lost(job, exitcode)
@@ -314,12 +316,13 @@ class AsynPool(_pool.Pool):
         self._inqueue = self._outqueue = \
             self._quick_put = self._quick_get = self._poll_result = None
 
-    def on_partial_read(self, job, proc):
+    def process_flush_queues(self, proc):
         resq = proc.outq._reader
-        # empty result queue buffer
-        while resq.poll(0):
-            self.handle_result_event(resq.fileno())
+        if not resq.closed:
+            while resq.poll(0):
+                self.handle_result_event(resq.fileno())
 
+    def on_partial_read(self, job, proc):
         # worker terminated by signal:
         # we cannot reuse the sockets again, because we don't know if
         # the process wrote/read anything frmo them, and if so we cannot
@@ -333,9 +336,11 @@ class AsynPool(_pool.Pool):
                 if conn:
                     for sock in (conn._reader, conn._writer):
                         if not sock.closed:
-                            os.close(sock.fileno())
-            self._queues[(proc.inq, proc.outq, proc.synq)] = \
-                self._queues[self.create_process_queues()] = None
+                            sock.close()
+                            #os.close(sock.fileno())
+            self._queues.pop((proc.inq, proc.outq, proc.synq))
+            self._queues[self.create_process_queues()] = None
+            self.on_inqueue_close(proc.inqW_fd)
 
     @classmethod
     def _set_result_sentinel(cls, _outqueue, _pool):
@@ -588,6 +593,11 @@ class TaskPool(BasePool):
                     callback()
                 active_writes.discard(fd)
 
+        def on_inqueue_close(fd):
+            active_writes.discard(fd)
+            all_inqueues.discard(fd)
+        self._pool.on_inqueue_close = on_inqueue_close
+
         def schedule_writes(ready_fd, events):
             if ready_fd in active_writes:
                 return
@@ -599,11 +609,16 @@ class TaskPool(BasePool):
             else:
                 if not job._accepted:
                     callback = promise(write_generator_gone)
+                    try:
+                        job._scheduled_for = fileno_to_inq[ready_fd]
+                    except KeyError:
+                        # process gone since scheduled, put back
+                        return put_message(job)
                     cor = _write_job(ready_fd, job, callback=callback)
                     mark_write_gen_as_active(cor)
                     mark_write_fd_as_active(ready_fd)
                     callback.args = (cor, )  # tricky as we need to pass ref
-                    hub_add((ready_fd, ), cor, WRITE)
+                    hub_add((ready_fd, ), cor, WRITE|ERR)
 
         def _create_payload(type_, args):
             body = dumps((type_, args), protocol=protocol)
@@ -621,12 +636,12 @@ class TaskPool(BasePool):
             mark_write_gen_as_active(cor)
             mark_write_fd_as_active(fd)
             callback.args = (cor, )
-            hub_add((fd, ), cor, WRITE)
+            hub_add((fd, ), cor, WRITE|ERR)
         self._pool.send_ack = send_ack
 
         def on_poll_start(hub):
             if outbound:
-                hub_add(diff(active_writes), schedule_writes, hub.WRITE)
+                hub_add(diff(active_writes), schedule_writes, WRITE|ERR)
         self.on_poll_start = on_poll_start
 
         def quick_put(tup):

+ 2 - 2
celery/worker/hub.py

@@ -242,11 +242,11 @@ class Hub(object):
     __exit__ = close
 
     def _repr_readers(self):
-        return ['{0}->{1}'.format(_rcb(cb), repr_flag(READ | ERR))
+        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(READ | ERR))
                 for fd, cb in items(self.readers)]
 
     def _repr_writers(self):
-        return ['{0}->{1}'.format(_rcb(cb), repr_flag(WRITE))
+        return ['({0}){1}->{2}'.format(fd, _rcb(cb), repr_flag(WRITE))
                 for fd, cb in items(self.writers)]
 
     def repr_active(self):

+ 9 - 6
celery/worker/loops.py

@@ -87,6 +87,8 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
             if qos.prev != qos.value:
                 update_qos()
 
+            #print('REG: %s' % (hub.repr_active(), ))
+
             update_readers(conn_poll_start())
             pool_poll_start(hub)
             if readers or writers:
@@ -94,7 +96,7 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                 while connection.more_to_read:
                     try:
                         events = poll(poll_timeout)
-                        #print('EVENTS: %r' % (hub.repr_events(events), ))
+                        #print('EVENTS: %s' % (hub.repr_events(events), ))
                     except ValueError:  # Issue 882
                         return
                     if not events:
@@ -109,12 +111,15 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                             elif event & ERR:
                                 cb = (readers.get(fileno) or
                                       writers.get(fileno))
-                                if cb is None:
-                                    continue
+                        except (KeyError, Empty):
+                            continue
+                        if cb is None:
+                            continue
+                        try:
                             if isinstance(cb, generator):
                                 try:
                                     next(cb)
-                                    hub_add(fileno, cb, WRITE)
+                                    hub_add(fileno, cb, WRITE|ERR)
                                 except StopIteration:
                                     hub_remove(fileno)
                                 except Exception:
@@ -122,8 +127,6 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                                     raise
                             else:
                                 cb(fileno, event)
-                        except (KeyError, Empty):
-                            continue
                         except socket.error:
                             if ns.state != CLOSE:  # pragma: no cover
                                 raise

+ 4 - 1
funtests/stress/stress.py

@@ -33,6 +33,9 @@ celery = Celery(
     'stress', broker='pyamqp://', backend='redis://',
     set_as_current=False,
 )
+celery.conf.update(
+    CELERYD_PREFETCH_MULTIPLIER=1,
+)
 
 
 @celery.task
@@ -77,7 +80,7 @@ class Stresstests(object):
         self.block_timeout = block_timeout
 
     def run(self, n=50):
-        tests = [self.manyshort,
+        tests = [#self.manyshort,
                  self.termbysig,
                  self.bigtasks,
                  self.smalltasks,