Browse Source

Consumer: Close broadcast consumer when connection is reset. Thanks to David Wolever

Ask Solem 14 years ago
parent
commit
de279344fa
2 changed files with 13 additions and 3 deletions
  1. 2 0
      celery/tests/test_worker.py
  2. 11 3
      celery/worker/consumer.py

+ 2 - 0
celery/tests/test_worker.py

@@ -433,6 +433,7 @@ class test_Consumer(unittest.TestCase):
         l = _Consumer(self.ready_queue, self.eta_schedule, self.logger,
                       send_events=False, init_callback=init_callback)
         l.task_consumer = MockConsumer()
+        l.broadcast_consumer = MockConsumer()
         l.qos = _QoS()
         l.connection = BrokerConnection()
 
@@ -451,6 +452,7 @@ class test_Consumer(unittest.TestCase):
                       send_events=False, init_callback=init_callback)
         l.qos = _QoS()
         l.task_consumer = MockConsumer()
+        l.broadcast_consumer = MockConsumer()
         l.connection = BrokerConnection()
 
         def raises_socket_error(limit=None):

+ 11 - 3
celery/worker/consumer.py

@@ -200,6 +200,7 @@ class Consumer(object):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
+        self.broadcast_consumer = None
         self.ready_queue = ready_queue
         self.eta_schedule = eta_schedule
         self.send_events = send_events
@@ -342,6 +343,12 @@ class Consumer(object):
                     self.maybe_conn_error(self.task_consumer.close)
         self.logger.debug("Consumer: "
                           "Closing connection to broker...")
+
+        self.logger.debug("CarrotListener: Closing broadcast channel...")
+        if self.broadcast_consumer:
+            self.broadcast_consumer = \
+                self.maybe_conn_error(self.broadcast_consumer.channel.close)
+
         if self.connection:
             self.connection = self.maybe_conn_error(self.connection.close)
 
@@ -364,8 +371,9 @@ class Consumer(object):
             self.event_dispatcher = \
                     self.maybe_conn_error(self.event_dispatcher.close)
 
+        self.logger.debug("BroadcastConsumer: Cancelling consumer...")
         if self.broadcast_consumer:
-            self.broadcast_consumer.channel.close()
+            self.maybe_conn_error(self.broadcast_consumer.cancel)
 
         if close:
             self.close_connection()
@@ -396,13 +404,13 @@ class Consumer(object):
         self.connection = self._open_connection()
         self.logger.debug("Consumer: Connection Established.")
         self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
-                                                          queues=self.queues)
+                                    queues=self.queues,
+                                    on_decode_error=self.on_decode_error)
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer,
                        self.initial_prefetch_count, self.logger)
         self.qos.update()                   # enable prefetch_count
 
-        self.task_consumer.on_decode_error = self.on_decode_error
         self.task_consumer.register_callback(self.receive_message)
 
         self.pidbox_node.channel = self.connection.channel()