Quellcode durchsuchen

Consumer: Close broadcast consumer when connection is reset. Thanks to David Wolever

Ask Solem vor 15 Jahren
Ursprung
Commit
491c36edbc
1 geänderte Dateien mit 12 neuen und 1 gelöschten Zeilen
  1. 12 1
      celery/worker/listener.py

+ 12 - 1
celery/worker/listener.py

@@ -208,6 +208,7 @@ class CarrotListener(object):
             initial_prefetch_count=2, pool=None):
         self.connection = None
         self.task_consumer = None
+        self.broadcast_consumer = None
         self.ready_queue = ready_queue
         self.eta_schedule = eta_schedule
         self.send_events = send_events
@@ -340,6 +341,12 @@ class CarrotListener(object):
                     self.maybe_conn_error(self.task_consumer.close)
         self.logger.debug("CarrotListener: "
                           "Closing connection to broker...")
+
+        self.logger.debug("CarrotListener: Closing broadcast channel...")
+        if self.broadcast_consumer:
+            self.broadcast_consumer = \
+                    self.maybe_conn_error(self.broadcast_consumer.close)
+
         if self.connection:
             self.connection = self.maybe_conn_error(self.connection.close)
 
@@ -362,6 +369,10 @@ class CarrotListener(object):
             self.event_dispatcher = \
                     self.maybe_conn_error(self.event_dispatcher.close)
 
+        self.logger.debug("BroadcastConsumer: Cancelling consumer...")
+        if self.broadcast_consumer:
+            self.maybe_conn_error(self.broadcast_consumer.cancel)
+
         if close:
             self.close_connection()
 
@@ -391,12 +402,12 @@ class CarrotListener(object):
         self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
         self.task_consumer = get_consumer_set(connection=self.connection)
+        self.task_consumer.on_decode_error = self.on_decode_error
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer,
                        self.initial_prefetch_count, self.logger)
         self.qos.update()                       # enable prefetch_count QoS.
 
-        self.task_consumer.on_decode_error = self.on_decode_error
         self.broadcast_consumer = BroadcastConsumer(self.connection,
                                                     hostname=self.hostname)
         self.task_consumer.register_callback(self.receive_message)