|
@@ -154,6 +154,10 @@ def debug(msg, *args, **kwargs):
|
|
|
logger.debug('Consumer: %s' % (msg, ), *args, **kwargs)
|
|
|
|
|
|
|
|
|
+def dump_body(m, body):
|
|
|
+ return "%s (%sb)" % (text.truncate(safe_repr(body), 1024), len(m.body))
|
|
|
+
|
|
|
+
|
|
|
class Component(StartStopComponent):
|
|
|
name = 'worker.consumer'
|
|
|
last = True
|
|
@@ -501,7 +505,7 @@ class Consumer(object):
|
|
|
self.qos.decrement_eventually()
|
|
|
|
|
|
def _message_report(self, body, message):
|
|
|
- return MESSAGE_REPORT_FMT % (text.truncate(safe_repr(body), 1024),
|
|
|
+ return MESSAGE_REPORT_FMT % (dump_body(message, body),
|
|
|
safe_repr(message.content_type),
|
|
|
safe_repr(message.content_encoding),
|
|
|
safe_repr(message.delivery_info))
|
|
@@ -511,11 +515,11 @@ class Consumer(object):
|
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
|
def handle_unknown_task(self, body, message, exc):
|
|
|
- error(UNKNOWN_TASK_ERROR, exc, safe_repr(body), exc_info=True)
|
|
|
+ error(UNKNOWN_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
|
def handle_invalid_task(self, body, message, exc):
|
|
|
- error(INVALID_TASK_ERROR, str(exc), safe_repr(body), exc_info=True)
|
|
|
+ error(INVALID_TASK_ERROR, exc, dump_body(message, body), exc_info=True)
|
|
|
message.reject_log_error(logger, self.connection_errors)
|
|
|
|
|
|
def receive_message(self, body, message):
|
|
@@ -610,7 +614,7 @@ class Consumer(object):
|
|
|
"""
|
|
|
crit("Can't decode message body: %r (type:%r encoding:%r raw:%r')",
|
|
|
exc, message.content_type, message.content_encoding,
|
|
|
- text.truncate(safe_repr(message.body), 1024))
|
|
|
+ dump_body(message, message.body))
|
|
|
message.ack()
|
|
|
|
|
|
def reset_pidbox_node(self):
|