Browse Source

Better log messages when trying failover (multiple hosts in kombu master)

Ask Solem 12 years ago
parent
commit
3990b4b88a
2 changed files with 39 additions and 17 deletions
  1. 9 4
      celery/utils/timeutils.py
  2. 30 13
      celery/worker/consumer.py

+ 9 - 4
celery/utils/timeutils.py

@@ -238,15 +238,20 @@ def weekday(name):
         raise KeyError(name)
 
 
-def humanize_seconds(secs, prefix=''):
+def humanize_seconds(secs, prefix='', sep=''):
     """Show seconds in human form, e.g. 60 is "1 minute", 7200 is "2
-    hours"."""
+    hours".
+
+    :keyword prefix: Can be used to add a preposition to the output,
+        e.g. 'in' will give 'in 1 second', but add nothing to 'now'.
+
+    """
     secs = float(secs)
     for unit, divider, formatter in TIME_UNITS:
         if secs >= divider:
             w = secs / divider
-            return '{0}{1} {2}'.format(prefix, formatter(w),
-                                       pluralize(w, unit))
+            return '{0}{1}{2} {3}'.format(prefix, sep, formatter(w),
+                                          pluralize(w, unit))
     return 'now'
 
 

+ 30 - 13
celery/worker/consumer.py

@@ -86,10 +86,11 @@ from celery.app import app_or_default
 from celery.datastructures import AttributeDict
 from celery.exceptions import InvalidTaskError, SystemTerminate
 from celery.task.trace import build_tracer
+from celery.utils import text
 from celery.utils import timer2
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
-from celery.utils import text
+from celery.utils.timeutils import humanize_seconds
 
 from . import state
 from .bootsteps import StartStopComponent
@@ -135,16 +136,29 @@ The full contents of the message body was:
 %s
 """
 
-MESSAGE_REPORT_FMT = """\
+MESSAGE_REPORT = """\
 body: {0} {{content_type:{1} content_encoding:{2} delivery_info:{3}}}\
 """
 
 
 RETRY_CONNECTION = """\
-Consumer: Connection to broker lost. \
+consumer: Connection to broker lost. \
 Trying to re-establish the connection...\
 """
 
+CONNECTION_ERROR = """\
+consumer: Cannot connect to %s: %s.
+%s
+"""
+
+CONNECTION_RETRY = """\
+Trying again %(when)s...\
+"""
+
+CONNECTION_FAILOVER = """\
+Will retry using next failover.\
+"""
+
 task_reserved = state.task_reserved
 
 logger = get_logger(__name__)
@@ -153,7 +167,7 @@ info, warn, error, crit = (logger.info, logger.warn,
 
 
 def debug(msg, *args, **kwargs):
-    logger.debug('Consumer: {0}'.format(msg), *args, **kwargs)
+    logger.debug('consumer: {0}'.format(msg), *args, **kwargs)
 
 
 def dump_body(m, body):
@@ -527,10 +541,10 @@ class Consumer(object):
         self.qos.decrement_eventually()
 
     def _message_report(self, body, message):
-        return MESSAGE_REPORT_FMT.format(dump_body(message, body),
-                                         safe_repr(message.content_type),
-                                         safe_repr(message.content_encoding),
-                                         safe_repr(message.delivery_info))
+        return MESSAGE_REPORT.format(dump_body(message, body),
+                                     safe_repr(message.content_type),
+                                     safe_repr(message.content_encoding),
+                                     safe_repr(message.delivery_info))
 
     def handle_unknown_message(self, body, message):
         warn(UNKNOWN_FORMAT, self._message_report(body, message))
@@ -675,6 +689,7 @@ class Consumer(object):
         self._pidbox_node_stopped = threading.Event()
         try:
             with self._open_connection() as conn:
+                info('pidbox: Connected to %s.', conn.as_uri())
                 self.pidbox_node.channel = conn.default_channel
                 self.broadcast_consumer = self.pidbox_node.listen(
                                             callback=self.on_control)
@@ -701,7 +716,7 @@ class Consumer(object):
 
         # Re-establish the broker connection and setup the task consumer.
         self.connection = self._open_connection()
-        debug('Connection established.')
+        info('consumer: Connected to %s.', self.connection.as_uri())
         self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
                                     on_decode_error=self.on_decode_error)
         # QoS: Reset prefetch window.
@@ -746,16 +761,18 @@ class Consumer(object):
         :setting:`BROKER_CONNECTION_RETRY` setting is enabled
 
         """
+        conn = self.app.connection(heartbeat=self.amqheartbeat)
 
         # Callback called for each retry while the connection
         # can't be established.
-        def _error_handler(exc, interval):
-            error('Consumer: Connection Error: %s. '
-                  'Trying again in %d seconds...', exc, interval)
+        def _error_handler(exc, interval, next_step=CONNECTION_RETRY):
+            if getattr(conn, 'alt', None) and interval == 0:
+                next_step = CONNECTION_FAILOVER
+            error(CONNECTION_ERROR, conn.as_uri(), exc,
+                  next_step % {'when': humanize_seconds(interval, 'in', ' ')})
 
         # remember that the connection is lazy, it won't establish
         # until it's needed.
-        conn = self.app.connection(heartbeat=self.amqheartbeat)
         if not self.app.conf.BROKER_CONNECTION_RETRY:
             # retry disabled, just call connect directly.
             conn.connect()