Ver Fonte

Reset pidbox node on channel failure

Ask Solem há 14 anos atrás
pai
commit
4fc9e9c624
3 ficheiros alterados com 24 adições e 7 exclusões
  1. 3 0
      celery/events/__init__.py
  2. 1 1
      celery/tests/test_worker.py
  3. 20 6
      celery/worker/consumer.py

+ 3 - 0
celery/events/__init__.py

@@ -109,6 +109,9 @@ class EventDispatcher(object):
             type, fields, _ = self._outbound_buffer.popleft()
             type, fields, _ = self._outbound_buffer.popleft()
             self.send(type, **fields)
             self.send(type, **fields)
 
 
+    def copy_buffer(self, other):
+        self._outbound_buffer = other._outbound_buffer
+
     def close(self):
     def close(self):
         """Close the event dispatcher."""
         """Close the event dispatcher."""
         self._lock.locked() and self._lock.release()
         self._lock.locked() and self._lock.release()

+ 1 - 1
celery/tests/test_worker.py

@@ -67,6 +67,7 @@ class MockEventDispatcher(object):
     sent = []
     sent = []
     closed = False
     closed = False
     flushed = False
     flushed = False
+    _outbound_buffer = []
 
 
     def send(self, event, *args, **kwargs):
     def send(self, event, *args, **kwargs):
         self.sent.append(event)
         self.sent.append(event)
@@ -568,7 +569,6 @@ class test_Consumer(unittest.TestCase):
         finally:
         finally:
             l.app.conf.BROKER_CONNECTION_RETRY = p
             l.app.conf.BROKER_CONNECTION_RETRY = p
         l.stop_consumers()
         l.stop_consumers()
-        self.assertTrue(dispatcher.flushed)
         l.event_dispatcher = MockEventDispatcher()
         l.event_dispatcher = MockEventDispatcher()
         l.receive_message(m.decode(), m)
         l.receive_message(m.decode(), m)
         l.eta_schedule.stop()
         l.eta_schedule.stop()

+ 20 - 6
celery/worker/consumer.py

@@ -259,8 +259,8 @@ class Consumer(object):
         self.init_callback(self)
         self.init_callback(self)
 
 
         while 1:
         while 1:
-            self.reset_connection()
             try:
             try:
+                self.reset_connection()
                 self.consume_messages()
                 self.consume_messages()
             except self.connection_errors:
             except self.connection_errors:
                 self.logger.error("Consumer: Connection to broker lost."
                 self.logger.error("Consumer: Connection to broker lost."
@@ -325,6 +325,7 @@ class Consumer(object):
             self.logger.error(
             self.logger.error(
                 "Error occurred while handling control command: %r\n%r" % (
                 "Error occurred while handling control command: %r\n%r" % (
                     exc, traceback.format_exc()), exc_info=sys.exc_info())
                     exc, traceback.format_exc()), exc_info=sys.exc_info())
+            self.reset_pidbox_node()
 
 
     def apply_eta_task(self, task):
     def apply_eta_task(self, task):
         state.task_reserved(task)
         state.task_reserved(task)
@@ -431,6 +432,17 @@ class Consumer(object):
                                 message.content_encoding, message.body))
                                 message.content_encoding, message.body))
         message.ack()
         message.ack()
 
 
+    def reset_pidbox_node(self):
+        if self.pidbox_node.channel:
+            try:
+                self.pidbox_node.channel.close()
+            except self.connection_errors + self.channel_errors:
+                pass
+
+        self.pidbox_node.channel = self.connection.channel()
+        self.broadcast_consumer = self.pidbox_node.listen(
+                                        callback=self.on_control)
+
     def reset_connection(self):
     def reset_connection(self):
         """Re-establish connection and set up consumers."""
         """Re-establish connection and set up consumers."""
         self.logger.debug(
         self.logger.debug(
@@ -453,16 +465,18 @@ class Consumer(object):
 
 
         self.task_consumer.register_callback(self.receive_message)
         self.task_consumer.register_callback(self.receive_message)
 
 
-        self.pidbox_node.channel = self.connection.channel()
-        self.broadcast_consumer = self.pidbox_node.listen(
-                                        callback=self.on_control)
+        # Pidbox
+        self.reset_pidbox_node()
 
 
         # Flush events sent while connection was down.
         # Flush events sent while connection was down.
-        if self.event_dispatcher:
-            self.event_dispatcher.flush()
+        prev_event_dispatcher = self.event_dispatcher
         self.event_dispatcher = self.app.events.Dispatcher(self.connection,
         self.event_dispatcher = self.app.events.Dispatcher(self.connection,
                                                 hostname=self.hostname,
                                                 hostname=self.hostname,
                                                 enabled=self.send_events)
                                                 enabled=self.send_events)
+        if prev_event_dispatcher:
+            self.event_dispatcher.copy_buffer(prev_event_dispatcher)
+            self.event_dispatcher.flush()
+
         self.restart_heartbeat()
         self.restart_heartbeat()
 
 
         self._state = RUN
         self._state = RUN