Jelajahi Sumber

Handle message decode errors properly. Thanks to Jacob Burch.

Ask Solem 15 tahun lalu
induk
melakukan
1ca8fa2431
2 mengubah file dengan 44 tambahan dan 1 penghapusan
  1. 30 1
      celery/messaging.py
  2. 14 0
      celery/worker/listener.py

+ 30 - 1
celery/messaging.py

@@ -7,7 +7,7 @@ import socket
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 
 
 from carrot.connection import DjangoBrokerConnection
 from carrot.connection import DjangoBrokerConnection
-from carrot.messaging import Publisher, Consumer, ConsumerSet
+from carrot.messaging import Publisher, Consumer, ConsumerSet as _ConsumerSet
 from billiard.utils.functional import wraps
 from billiard.utils.functional import wraps
 
 
 from celery import conf
 from celery import conf
@@ -75,6 +75,35 @@ class TaskPublisher(Publisher):
         return task_id
         return task_id
 
 
 
 
+class ConsumerSet(_ConsumerSet):
+    """ConsumerSet with an optional decode error callback.
+
+    For more information see :class:`carrot.messaging.ConsumerSet`.
+
+    .. attribute:: on_decode_error
+
+        Callback called if a message had decoding errors.
+        The callback is called with the signature::
+
+            callback(message, exception)
+
+    """
+    on_decode_error = None
+
+    def _receive_callback(self, raw_message):
+        message = self.backend.message_to_python(raw_message)
+        if self.auto_ack and not message.acknowledged:
+            message.ack()
+        try:
+            decoded = message.decode()
+        except Exception, exc:
+            if self.on_decode_error:
+                return self.on_decode_error(message, exc)
+            else:
+                raise
+        self.receive(decoded, message)
+
+
 class TaskConsumer(Consumer):
 class TaskConsumer(Consumer):
     """Consume tasks"""
     """Consume tasks"""
     queue = conf.DEFAULT_QUEUE
     queue = conf.DEFAULT_QUEUE

+ 14 - 0
celery/worker/listener.py

@@ -183,6 +183,19 @@ class CarrotListener(object):
         if close:
         if close:
             self.close_connection()
             self.close_connection()
 
 
+    def on_decode_error(self, message, exc):
+        """Callback called if the message had decoding errors.
+
+        :param message: The message with errors.
+        :param exc: The original exception instance.
+
+        """
+        self.logger.critical("Message decoding error: %s "
+                             "(type:%s encoding:%s raw:'%s')" % (
+                                exc, message.content_type,
+                                message.content_encoding, message.body))
+        message.ack()
+
     def reset_connection(self):
     def reset_connection(self):
         self.logger.debug(
         self.logger.debug(
                 "CarrotListener: Re-establishing connection to the broker...")
                 "CarrotListener: Re-establishing connection to the broker...")
@@ -199,6 +212,7 @@ class CarrotListener(object):
         self.connection = self._open_connection()
         self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
         self.logger.debug("CarrotListener: Connection Established.")
         self.task_consumer = get_consumer_set(connection=self.connection)
         self.task_consumer = get_consumer_set(connection=self.connection)
+        self.task_consumer.on_decode_error = self.on_decode_error
         self.broadcast_consumer = BroadcastConsumer(self.connection,
         self.broadcast_consumer = BroadcastConsumer(self.connection,
                                                     hostname=self.hostname)
                                                     hostname=self.hostname)
         self.task_consumer.register_callback(self.receive_message)
         self.task_consumer.register_callback(self.receive_message)