Browse Source

New celeryd banner

Ask Solem 14 years ago
parent
commit
0008abd59c
3 changed files with 45 additions and 34 deletions
  1. 9 6
      celery/app/amqp.py
  2. 35 27
      celery/apps/worker.py
  3. 1 1
      celery/tests/test_worker_heartbeat.py

+ 9 - 6
celery/app/amqp.py

@@ -11,7 +11,7 @@ AMQ related functionality.
 """
 from datetime import datetime, timedelta
 
-from kombu import BrokerConnection
+from kombu import BrokerConnection, Exchange
 from kombu.connection import Resource
 from kombu import compat as messaging
 
@@ -28,7 +28,7 @@ MSG_OPTIONS = ("mandatory", "priority", "immediate", "routing_key",
 
 #: Human readable queue declaration.
 QUEUE_FORMAT = """
-. %(name)s -> exchange:%(exchange)s (%(exchange_type)s) \
+. %(name)s exchange:%(exchange)s (%(exchange_type)s) \
 binding:%(binding_key)s
 """
 
@@ -84,11 +84,14 @@ class Queues(UserDict):
                              exchange=exchange,
                              exchange_type=exchange_type)
 
-    def format(self, indent=0):
+    def format(self, indent=0, indent_first=True):
         """Format routing table into string for log dumps."""
-        info = "\n".join(QUEUE_FORMAT.strip() % dict(name=name, **config)
-                                for name, config in self.items())
-        return textindent(info, indent=indent)
+        info = [QUEUE_FORMAT.strip() % dict(
+                    name=(name + ":").ljust(12), **config)
+                            for name, config in self.items()]
+        if indent_first:
+            return textindent("\n".join(info), indent)
+        return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
 
     def select_subset(self, wanted, create_missing=True):
         """Select subset of the currently defined queues.

+ 35 - 27
celery/apps/worker.py

@@ -16,22 +16,25 @@ from celery.exceptions import ImproperlyConfigured, SystemTerminate
 from celery.utils import get_full_cls_name, LOG_LEVELS, cry
 from celery.worker import WorkController
 
-
-STARTUP_INFO_FMT = """
-Configuration ->
-    . broker -> %(conninfo)s
-    . queues ->
-%(queues)s
-    . concurrency -> %(concurrency)s
-    . loader -> %(loader)s
-    . logfile -> %(logfile)s@%(loglevel)s
-    . events -> %(events)s
-    . beat -> %(celerybeat)s
+BANNER = """
+ -------------- celery@%(hostname)s v%(version)s
+---- **** -----
+--- * ***  * -- [Configuration]
+-- * - **** ---   . broker:      %(conninfo)s
+- ** ----------   . loader:      %(loader)s
+- ** ----------   . logfile:     %(logfile)s@%(loglevel)s
+- ** ----------   . concurrency: %(concurrency)s
+- ** ----------   . events:      %(events)s
+- *** --- * ---   . beat:        %(celerybeat)s
+-- ******* ----
+--- ***** ----- [Queues]
+ --------------   %(queues)s
+"""
+
+EXTRA_INFO_FMT = """
+[Tasks]
 %(tasks)s
-""".strip()
-
-TASK_LIST_FMT = """    . tasks ->\n%s"""
-
+"""
 
 class Worker(object):
     WorkController = WorkController
@@ -102,8 +105,6 @@ class Worker(object):
         self.init_queues()
         self.worker_init()
         self.redirect_stdouts_to_logger()
-        print(str(self.colored.cyan(
-                "celery@%s v%s is starting." % (self.hostname, __version__))))
 
         if getattr(os, "geteuid", None) and os.geteuid() == 0:
             warnings.warn(
@@ -114,8 +115,9 @@ class Worker(object):
 
         # Dump configuration to screen so we have some basic information
         # for when users sends bug reports.
-        print(str(self.colored.reset(" \n", self.startup_info())))
-        self.set_process_status("Running...")
+        print(str(self.colored.cyan(" \n", self.startup_info())) +
+              str(self.colored.reset(self.extra_info())))
+        self.set_process_status("-active-")
 
         self.run_worker()
 
@@ -169,25 +171,31 @@ class Worker(object):
         if not include_builtins:
             tasklist = filter(lambda s: not s.startswith("celery."),
                               tasklist)
-        return TASK_LIST_FMT % "\n".join("\t. %s" % task
-                                            for task in sorted(tasklist))
+        return "\n".join("  . %s" % task for task in sorted(tasklist))
 
-    def startup_info(self):
-        tasklist = ""
+    def extra_info(self):
         if self.loglevel <= logging.INFO:
             include_builtins = self.loglevel <= logging.DEBUG
             tasklist = self.tasklist(include_builtins=include_builtins)
+            return EXTRA_INFO_FMT % {"tasks": tasklist}
+        return ""
 
-        return STARTUP_INFO_FMT % {
+    def startup_info(self):
+        concurrency = self.concurrency
+        if self.autoscale:
+            cmax, cmin = self.autoscale
+            concurrency = "{min=%s, max=%s}" % (cmin, cmax)
+        return BANNER % {
+            "hostname": self.hostname,
+            "version": __version__,
             "conninfo": self.app.broker_connection().as_uri(),
-            "queues": self.queues.format(indent=8),
-            "concurrency": self.concurrency,
+            "concurrency": concurrency,
             "loglevel": LOG_LEVELS[self.loglevel],
             "logfile": self.logfile or "[stderr]",
             "celerybeat": self.run_clockservice and "ON" or "OFF",
             "events": self.events and "ON" or "OFF",
-            "tasks": tasklist,
             "loader": get_full_cls_name(self.loader.__class__),
+            "queues": self.queues.format(indent=18, indent_first=False),
         }
 
     def run_worker(self):

+ 1 - 1
celery/tests/test_worker_heartbeat.py

@@ -10,7 +10,7 @@ class MockDispatcher(object):
     def __init__(self):
         self.sent = []
 
-    def send(self, msg):
+    def send(self, msg, **_fields):
         self.sent.append(msg)
         if self.heart:
             if self.next_iter > 10: