Jelajahi Sumber

multi-sock: fixes race condition

Ask Solem 12 tahun lalu
induk
melakukan
e2a3219c56
3 mengubah file dengan 49 tambahan dan 46 penghapusan
  1. 10 9
      celery/bootsteps.py
  2. 34 35
      celery/concurrency/processes.py
  3. 5 2
      celery/worker/consumer.py

+ 10 - 9
celery/bootsteps.py

@@ -128,17 +128,19 @@ class Namespace(object):
     def close(self, parent):
         if self.on_close:
             self.on_close()
-        for step in parent.steps:
-            close = getattr(step, 'close', None)
-            if close:
-                close(parent)
+        self.send_all(parent, 'close', 'Closing', reverse=False)
+
+    def restart(self, parent, method='stop', description='Restarting'):
+        self.send_all(parent, method, description)
 
-    def restart(self, parent, description='Restarting', attr='stop'):
+    def send_all(self, parent, method, description=None, reverse=True):
+        description = description or method.capitalize()
+        steps = reversed(parent.steps) if reverse else parent.steps
         with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT):  # Issue 975
-            for step in reversed(parent.steps):
+            for step in steps:
                 if step:
                     self._debug('%s %s...', description, step.alias)
-                    fun = getattr(step, attr, None)
+                    fun = getattr(step, method, None)
                     if fun:
                         fun(parent)
 
@@ -147,13 +149,12 @@ class Namespace(object):
         if self.state in (CLOSE, TERMINATE):
             return
 
-        self.close(parent)
-
         if self.state != RUN or self.started != len(parent.steps):
             # Not fully started, can safely exit.
             self.state = TERMINATE
             self.shutdown_complete.set()
             return
+        self.close(parent)
         self.state = CLOSE
         self.restart(parent, what, 'terminate' if terminate else 'stop')
 

+ 34 - 35
celery/concurrency/processes.py

@@ -157,10 +157,8 @@ class ResultHandler(_pool.ResultHandler):
                     outqueues.discard(fd)
                     continue
                 else:
-                    if task is None:
-                        debug('result handler ignoring extra sentinel')
-                        continue
-                    on_state_change(task)
+                    if task:
+                        on_state_change(task)
                 try:
                     join_exited_workers(shutdown=True)
                 except WorkersJoined:
@@ -215,15 +213,15 @@ class AsynPool(_pool.Pool):
         while resq.poll(0):
             self.handle_result_event(resq.fileno())
 
-        # job was not acked, so find another worker to send it to.
-        if not job._accepted:
-            self._put_back(job)
-
         # worker terminated by signal:
         # we cannot reuse the sockets again, because we don't know if
         # the process wrote/read anything frmo them, and if so we cannot
         # restore the message boundaries.
         if proc.exitcode < 0:
+            # job was not acked, so find another worker to send it to.
+            if not job._accepted:
+                self._put_back(job)
+
             for conn in (proc.inq, proc.outq):
                 for sock in (conn._reader, conn._writer):
                     if not sock.closed:
@@ -233,8 +231,7 @@ class AsynPool(_pool.Pool):
 
     @classmethod
     def _set_result_sentinel(cls, _outqueue, workers):
-        for worker in workers:
-            worker.outq.put(None)
+        pass
 
     @classmethod
     def _help_stuff_finish(cls, _inqueue, _taskhandler, _size, fileno_to_inq):
@@ -374,7 +371,7 @@ class TaskPool(BasePool):
             elif hard:
                 R._tref = apply_after(hard * 1000.0,
                                       on_hard_timeout, (R, ))
-        self.on_timeout_set = on_timeout_set
+        self._pool.on_timeout_set = on_timeout_set
 
         def on_timeout_cancel(result):
             try:
@@ -382,19 +379,19 @@ class TaskPool(BasePool):
                 delattr(result, '_tref')
             except AttributeError:
                 pass
-        self.on_timeout_cancel = on_timeout_cancel
+        self._pool.on_timeout_cancel = on_timeout_cancel
 
         def on_process_up(proc):
-            pool._all_inqueues.add(proc.inqW_fd)
+            all_inqueues.add(proc.inqW_fd)
             hub_add(proc.sentinel, maintain_pool, READ | ERR)
             hub_add(proc.outqR_fd, pool.handle_result_event, READ | ERR)
-        self.on_process_up = on_process_up
+        self._pool.on_process_up = on_process_up
 
         def on_process_down(proc):
-            pool._all_inqueues.discard(proc.inqW_fd)
+            all_inqueues.discard(proc.inqW_fd)
             hub_remove(proc.sentinel)
             hub_remove(proc.outqR_fd)
-        self.on_process_down = on_process_down
+        self._pool.on_process_down = on_process_down
 
         def _write_to(fd, job, callback=None):
             header, body, body_size = job._payload
@@ -437,12 +434,13 @@ class TaskPool(BasePool):
                 for inqfd in diff(active_writes):
                     hub_remove(inqfd)
             else:
-                callback = promise(write_generator_gone)
-                cor = _write_to(ready_fd, job, callback=callback)
-                mark_write_gen_as_active(cor)
-                mark_write_fd_as_active(ready_fd)
-                callback.args = (cor, )  # tricky as we need to pass ref
-                add_coro((ready_fd, ), cor, WRITE)
+                if not job._accepted:
+                    callback = promise(write_generator_gone)
+                    cor = _write_to(ready_fd, job, callback=callback)
+                    mark_write_gen_as_active(cor)
+                    mark_write_fd_as_active(ready_fd)
+                    callback.args = (cor, )  # tricky as we need to pass ref
+                    add_coro((ready_fd, ), cor, WRITE)
 
         def on_poll_start(hub):
             if outbound:
@@ -467,19 +465,20 @@ class TaskPool(BasePool):
         if self.outbound_buffer:
             self.outbound_buffer.clear()
         try:
-            # flush outgoing buffers
-            intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)
-            while self._active_writers:
-                writers = list(self._active_writers)
-                for gen in writers:
-                    if gen.gi_frame.f_lasti != -1:  # generator started?
-                        try:
-                            next(gen)
-                        except StopIteration:
-                            self._active_writers.discard(gen)
-                # workers may have exited in the meantime.
-                self.maintain_pool()
-                sleep(next(intervals))  # don't busyloop
+            if self._pool._state == RUN:
+                # flush outgoing buffers
+                intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)
+                while self._active_writers:
+                    writers = list(self._active_writers)
+                    for gen in writers:
+                        if gen.gi_frame.f_lasti != -1:  # generator started?
+                            try:
+                                next(gen)
+                            except StopIteration:
+                                self._active_writers.discard(gen)
+                    # workers may have exited in the meantime.
+                    self.maintain_pool()
+                    sleep(next(intervals))  # don't busyloop
         finally:
             self.outbound_buffer.clear()
             self._active_writers.clear()

+ 5 - 2
celery/worker/consumer.py

@@ -108,6 +108,8 @@ def dump_body(m, body):
 
 
 class Consumer(object):
+    #: set when consumer is shutting down.
+    in_shutdown = False
 
     #: Optional callback called the first time the worker
     #: is ready to receive tasks.
@@ -137,7 +139,7 @@ class Consumer(object):
         ]
 
         def shutdown(self, parent):
-            self.restart(parent, 'Shutdown', 'shutdown')
+            self.send_all(parent, 'shutdown')
 
     def __init__(self, handle_task,
                  init_callback=noop, hostname=None,
@@ -227,12 +229,13 @@ class Consumer(object):
                 if ns.state != CLOSE and self.connection:
                     warn(CONNECTION_RETRY, exc_info=True)
                     try:
-                        self.connection.close()
+                        self.connection.collect()
                     except Exception:
                         pass
                     ns.restart(self)
 
     def shutdown(self):
+        self.in_shutdown = True
         self.namespace.shutdown(self)
 
     def stop(self):