Explorar el Código

Fixes serious bug with eventlet/gevent SIGINT shutdown. Closes #457

Ask Solem hace 13 años
padre
commit
fa3f128611

+ 0 - 1
celery/app/base.py

@@ -13,7 +13,6 @@ from __future__ import with_statement
 
 import os
 import platform as _platform
-import sys
 
 from contextlib import contextmanager
 from copy import deepcopy

+ 11 - 1
celery/apps/worker.py

@@ -17,6 +17,13 @@ from ..exceptions import ImproperlyConfigured, SystemTerminate
 from ..utils import get_full_cls_name, isatty, LOG_LEVELS, cry
 from ..worker import WorkController
 
+try:
+    from greenlet import GreenletExit
+    IGNORE_ERRORS = (GreenletExit, )
+except ImportError:
+    IGNORE_ERRORS = ()
+
+
 BANNER = """
  -------------- celery@%(hostname)s v%(version)s
 ---- **** -----
@@ -135,7 +142,10 @@ class Worker(object):
               str(self.colored.reset(self.extra_info())))
         self.set_process_status("-active-")
 
-        self.run_worker()
+        try:
+            self.run_worker()
+        except IGNORE_ERRORS:
+            pass
 
     def on_consumer_ready(self, consumer):
         signals.worker_ready.send(sender=consumer)

+ 1 - 0
celery/concurrency/base.py

@@ -29,6 +29,7 @@ class BasePool(object):
     Timer = timer2.Timer
 
     signal_safe = True
+    rlimit_safe = True
     is_green = False
 
     _state = None

+ 1 - 0
celery/concurrency/gevent.py

@@ -87,6 +87,7 @@ class TaskPool(BasePool):
     Timer = Timer
 
     signal_safe = False
+    rlimit_safe = False
     is_green = True
 
     def __init__(self, *args, **kwargs):

+ 15 - 13
celery/worker/__init__.py

@@ -12,6 +12,7 @@ import atexit
 import logging
 import socket
 import sys
+import threading
 import traceback
 
 from kombu.syn import blocking
@@ -123,6 +124,7 @@ class WorkController(object):
 
         self.app = app_or_default(app)
         conf = self.app.conf
+        self._shutdown_complete = threading.Event()
 
         # Options
         self.loglevel = loglevel or self.loglevel
@@ -172,6 +174,8 @@ class WorkController(object):
             atexit.register(self._persistence.save)
 
         # Queues
+        if not self.pool_cls.rlimit_safe:
+            self.disable_rate_limits = True
         if self.disable_rate_limits:
             self.ready_queue = FastQueue()
             self.ready_queue.put = self.process_task
@@ -264,20 +268,12 @@ class WorkController(object):
                 blocking(component.start)
         except SystemTerminate:
             self.terminate()
-            raise
-        except SystemExit:
-            self.stop()
-            raise
-        except Exception:
-            self.stop()
-            raise
         except:
             self.stop()
-            try:
-                raise
-            except TypeError:
-                # eventlet borks here saying that the exception is None(?)
-                sys.exit()
+
+        # Will only get here if running green,
+        # makes sure all greenthreads have exited.
+        self._shutdown_complete.wait()
 
     def process_task(self, request):
         """Process task by sending it to the pool of workers."""
@@ -290,7 +286,7 @@ class WorkController(object):
                                  exc_info=True)
         except SystemTerminate:
             self.terminate()
-            raise SystemExit()
+            sys.exit()
         except BaseException, exc:
             self.stop()
             raise exc
@@ -308,9 +304,13 @@ class WorkController(object):
     def _shutdown(self, warm=True):
         what = (warm and "stopping" or "terminating").capitalize()
 
+        if self._state in (self.CLOSE, self.TERMINATE):
+            return
+
         if self._state != self.RUN or self._running != len(self.components):
             # Not fully started, can safely exit.
             self._state = self.TERMINATE
+            self._shutdown_complete.set()
             return
 
         self._state = self.CLOSE
@@ -326,7 +326,9 @@ class WorkController(object):
 
         self.priority_timer.stop()
         self.consumer.close_connection()
+
         self._state = self.TERMINATE
+        self._shutdown_complete.set()
 
     def on_timer_error(self, exc_info):
         _, exc, _ = exc_info

+ 37 - 15
celery/worker/consumer.py

@@ -247,6 +247,8 @@ class Consumer(object):
 
     #: The process mailbox (kombu pidbox node).
     pidbox_node = None
+    _pidbox_node_shutdown = None   # used for greenlets
+    _pidbox_node_stopped = None    # used for greenlets
 
     #: The current worker pool instance.
     pool = None
@@ -446,19 +448,21 @@ class Consumer(object):
 
     def close_connection(self):
         """Closes the current broker connection and all open channels."""
+
+        # We must set self.connection to None here, so
+        # that the green pidbox thread exits.
+        connection, self.connection = self.connection, None
+
         if self.task_consumer:
             self._debug("Closing consumer channel...")
             self.task_consumer = \
                     self.maybe_conn_error(self.task_consumer.close)
 
-        if self.broadcast_consumer:
-            self._debug("Closing broadcast channel...")
-            self.broadcast_consumer = \
-                self.maybe_conn_error(self.broadcast_consumer.channel.close)
+        self.stop_pidbox_node()
 
-        if self.connection:
+        if connection:
             self._debug("Closing broker connection...")
-            self.connection = self.maybe_conn_error(self.connection.close)
+            self.maybe_conn_error(connection.close)
 
     def stop_consumers(self, close_connection=True):
         """Stop consuming tasks and broadcast commands, also stops
@@ -511,6 +515,7 @@ class Consumer(object):
 
     def reset_pidbox_node(self):
         """Sets up the process mailbox."""
+        self.stop_pidbox_node()
         # close previously opened channel if any.
         if self.pidbox_node.channel:
             try:
@@ -525,20 +530,37 @@ class Consumer(object):
                                         callback=self.on_control)
         self.broadcast_consumer.consume()
 
+    def stop_pidbox_node(self):
+        if self._pidbox_node_stopped:
+            self._pidbox_node_shutdown.set()
+            self._debug("Waiting for broadcast thread to shutdown...")
+            self._pidbox_node_stopped.wait()
+            self._pidbox_node_stopped = self._pidbox_node_shutdown = None
+        elif self.broadcast_consumer:
+            self._debug("Closing broadcast channel...")
+            self.broadcast_consumer = \
+                self.maybe_conn_error(self.broadcast_consumer.channel.close)
+
     def _green_pidbox_node(self):
         """Sets up the process mailbox when running in a greenlet
         environment."""
-        conn = self._open_connection()
-        self.pidbox_node.channel = conn.channel()
-        self.broadcast_consumer = self.pidbox_node.listen(
-                                        callback=self.on_control)
-        self.broadcast_consumer.consume()
-
+        # THIS CODE IS TERRIBLE
+        # Luckily work has already started rewriting the Consumer for 3.0.
+        self._pidbox_node_shutdown = threading.Event()
+        self._pidbox_node_stopped = threading.Event()
         try:
-            while self.connection:  # main connection still open?
-                conn.drain_events()
+            with self._open_connection() as conn:
+                self.pidbox_node.channel = conn.default_channel
+                self.broadcast_consumer = self.pidbox_node.listen(
+                                            callback=self.on_control)
+                with self.broadcast_consumer:
+                    while not self._pidbox_node_shutdown.isSet():
+                        try:
+                            conn.drain_events(timeout=1.0)
+                        except socket.timeout:
+                            pass
         finally:
-            conn.close()
+            self._pidbox_node_stopped.set()
 
     def reset_connection(self):
         """Re-establish the broker connection and set up consumers,