|
@@ -118,6 +118,10 @@ The full contents of the message body was:
|
|
|
%s
|
|
|
"""
|
|
|
|
|
|
+MESSAGE_REPORT_FMT = """\
|
|
|
+body: %s {content_type:%s content_encoding:%s delivery_info:%s}\
|
|
|
+"""
|
|
|
+
|
|
|
|
|
|
class QoS(object):
|
|
|
"""Quality of Service for Channel.
|
|
@@ -375,6 +379,12 @@ class Consumer(object):
|
|
|
self.ready_queue.put(task)
|
|
|
self.qos.decrement_eventually()
|
|
|
|
|
|
+ def _message_report(self, body, message):
|
|
|
+ return MESSAGE_REPORT_FMT % (safe_repr(body),
|
|
|
+ safe_repr(message.content_type),
|
|
|
+ safe_repr(message.content_encoding),
|
|
|
+ safe_repr(message.delivery_info))
|
|
|
+
|
|
|
def receive_message(self, body, message):
|
|
|
"""Handles incoming messages.
|
|
|
|
|
@@ -388,14 +398,17 @@ class Consumer(object):
|
|
|
message.ack()
|
|
|
except self.connection_errors + (AttributeError, ), exc:
|
|
|
self.logger.critical(
|
|
|
- "Couldn't ack %r: body:%r reason:%r" % (
|
|
|
- message.delivery_tag, safe_repr(body), exc))
|
|
|
+ "Couldn't ack %r: %s reason:%r" % (
|
|
|
+ message.delivery_tag,
|
|
|
+ self._message_report(body, message), exc))
|
|
|
|
|
|
- if not isinstance(body,dict) or not body.get("task"):
|
|
|
+ try:
|
|
|
+ body["task"]
|
|
|
+ except (KeyError, TypeError):
|
|
|
warnings.warn(RuntimeWarning(
|
|
|
"Received and deleted unknown message. Wrong destination?!? \
|
|
|
the full contents of the message body was: %s" % (
|
|
|
- safe_repr(body), )))
|
|
|
+ self._message_report(body, message), )))
|
|
|
ack()
|
|
|
return
|
|
|
|