Browse Source

Consumer: Fixes a bug where celeryd would not stop when a socket.error was raised at shutdown (it always happened after 6ae4ec75f3dfda73cc1e47e326c6fa727d1413a0)

Ask Solem 14 years ago
parent
commit
d70dae576c
1 changed files with 13 additions and 12 deletions
  1. 13 12
      celery/worker/consumer.py

+ 13 - 12
celery/worker/consumer.py

@@ -269,13 +269,14 @@ class Consumer(object):
 
         self.init_callback(self)
 
-        while 1:
+        while self._state != CLOSE:
             try:
                 self.reset_connection()
                 self.consume_messages()
             except self.connection_errors:
                 self.logger.error("Consumer: Connection to broker lost."
-                                + " Trying to re-establish connection...")
+                                + " Trying to re-establish connection...",
+                                exc_info=sys.exc_info())
 
     def consume_messages(self):
         """Consume messages forever (or until an exception is raised)."""
@@ -283,15 +284,16 @@ class Consumer(object):
         self.task_consumer.consume()
         self.logger.debug("Consumer: Ready to accept tasks!")
 
-        while 1:
-            if not self.connection:
-                break
+        while self._state != CLOSE and self.connection:
             if self.qos.prev != self.qos.value:
                 self.qos.update()
             try:
                 self.connection.drain_events(timeout=1)
             except socket.timeout:
                 pass
+            except socket.error:
+                if self._state != CLOSE:
+                    raise
 
     def on_task(self, task):
         """Handle received task.
@@ -390,27 +392,23 @@ class Consumer(object):
             pass
 
     def close_connection(self):
-        self.logger.debug("Consumer: "
-                          "Closing consumer channel...")
         if self.task_consumer:
+            self.logger.debug("Consumer: " "Closing consumer channel...")
             self.task_consumer = \
                     self.maybe_conn_error(self.task_consumer.close)
-        self.logger.debug("Consumer: "
-                          "Closing connection to broker...")
-
-        self.logger.debug("CarrotListener: Closing broadcast channel...")
         if self.broadcast_consumer:
+            self.logger.debug("CarrotListener: Closing broadcast channel...")
             self.broadcast_consumer = \
                 self.maybe_conn_error(self.broadcast_consumer.channel.close)
 
         if self.connection:
+            self.logger.debug("Consumer: " "Closing connection to broker...")
             self.connection = self.maybe_conn_error(self.connection.close)
 
     def stop_consumers(self, close=True):
         """Stop consuming."""
         if not self._state == RUN:
             return
-        self._state = CLOSE
 
         if self.heart:
             self.logger.debug("Heart: Going into cardiac arrest...")
@@ -475,6 +473,8 @@ class Consumer(object):
 
     def reset_connection(self):
         """Re-establish connection and set up consumers."""
+        import traceback
+        traceback.print_stack()
         self.logger.debug(
                 "Consumer: Re-establishing connection to the broker...")
         self.stop_consumers()
@@ -537,6 +537,7 @@ class Consumer(object):
         Does not close connection.
 
         """
+        self._state = CLOSE
         self.logger.debug("Consumer: Stopping consumers...")
         self.stop_consumers(close=False)