Kaynağa Gözat

listener: Handle connection errors while closing consumers.

Ask Solem 14 yıl önce
ebeveyn
işleme
3dc17b2ae2
1 değiştirilmiş dosya ile 20 ekleme ve 5 silme
  1. 20 5
      celery/worker/listener.py

+ 20 - 5
celery/worker/listener.py

@@ -312,13 +312,22 @@ class CarrotListener(object):
              the message was: %s" % message_data))
         message.ack()
 
+    def maybe_conn_error(self, predicate, fun):
+        if predicate:
+            try:
+                fun()
+            except Exception: # TODO kombu.connection_errors
+                pass
+
     def close_connection(self):
         self.logger.debug("CarrotListener: "
                           "Closing consumer channel...")
-        self.task_consumer = self.task_consumer and self.task_consumer.close()
+        self.task_consumer = self.maybe_conn_error(self.task_consumer,
+                                                   self.task_consumer.close)
         self.logger.debug("CarrotListener: "
                           "Closing connection to broker...")
-        self.connection = self.connection and self.connection.close()
+        self.connection = self.maybe_conn_error(self.connection,
+                                                self.connection.close)
 
     def stop_consumers(self, close=True):
         """Stop consuming."""
@@ -331,12 +340,14 @@ class CarrotListener(object):
             self.heart = self.heart.stop()
 
         self.logger.debug("TaskConsumer: Cancelling consumers...")
-        if self.task_consumer:
-            self.task_consumer.cancel()
+        self.maybe_conn_error(self.task_consumer,
+                              self.task_consumer.cancel)
 
         if self.event_dispatcher:
             self.logger.debug("EventDispatcher: Shutting down...")
-            self.event_dispatcher = self.event_dispatcher.close()
+            self.event_dispatcher = self.maybe_conn_error(
+                                            self.event_dispatcher,
+                                            self.event_dispatcher.close)
 
         if close:
             self.close_connection()
@@ -376,6 +387,10 @@ class CarrotListener(object):
         self.broadcast_consumer = BroadcastConsumer(self.connection,
                                                     hostname=self.hostname)
         self.task_consumer.register_callback(self.receive_message)
+
+        # Flush events sent while connection was down.
+        if self.event_dispatcher:
+            self.event_dispatcher.flush()
         self.event_dispatcher = EventDispatcher(self.connection,
                                                 hostname=self.hostname,
                                                 enabled=self.send_events)