Преглед на файлове

Merge branch 'rogerhu/fix_channel_errors'

Ask Solem преди 13 години
родител
ревизия
8a46eb778b
променени са 2 файла, в които са добавени 21 реда и са изтрити 1 реда
  1. 20 0
      celery/tests/test_worker/__init__.py
  2. 1 1
      celery/worker/consumer.py

+ 20 - 0
celery/tests/test_worker/__init__.py

@@ -368,6 +368,26 @@ class test_Consumer(unittest.TestCase):
         l.heart.stop()
         l.priority_timer.stop()
 
+    def test_start_channel_error(self):
+        # Regression test for AMQPChannelExceptions that can occur within the consumer. (i.e. 404 errors)
+
+        class MockConsumer(MainConsumer):
+            iterations = 0
+
+            def consume_messages(self):
+                if not self.iterations:
+                    self.iterations = 1
+                    raise KeyError("foo")
+                raise SyntaxError("bar")
+
+        l = MockConsumer(self.ready_queue, self.eta_schedule, self.logger,
+                             send_events=False, pool=BasePool())
+
+        l.channel_errors = (KeyError, )
+        self.assertRaises(SyntaxError, l.start)
+        l.heart.stop()
+        l.priority_timer.stop()
+
     def test_consume_messages_ignores_socket_timeout(self):
 
         class Connection(current_app.broker_connection().__class__):

+ 1 - 1
celery/worker/consumer.py

@@ -337,7 +337,7 @@ class Consumer(object):
             try:
                 self.reset_connection()
                 self.consume_messages()
-            except self.connection_errors:
+            except self.connection_errors + self.channel_errors:
                 self.logger.error("Consumer: Connection to broker lost."
                                 + " Trying to re-establish the connection...",
                                 exc_info=sys.exc_info())