Browse Source

Refactor the display of the broker conneciton info string.

Ask Solem 15 years ago
parent
commit
21a5bc7661
3 changed files with 32 additions and 20 deletions
  1. 8 8
      celery/bin/celerybeat.py
  2. 4 12
      celery/bin/celeryd.py
  3. 20 0
      celery/messaging.py

+ 8 - 8
celery/bin/celerybeat.py

@@ -52,12 +52,13 @@ from celery import __version__
 from celery.log import emergency_error
 from celery.beat import ClockService
 from celery.loaders import current_loader, settings
+from celery.messaging import get_connection_info
 
 STARTUP_INFO_FMT = """
 Configuration ->
-    * Broker -> amqp://%(vhost)s@%(host)s:%(port)s
+    * Broker -> %(conninfo)s
     * Exchange -> %(exchange)s (%(exchange_type)s)
-    * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
+    * Consumer -> Queue:%(consumer_queue)s Binding:%(consumer_rkey)s
 """.strip()
 
 OPTION_LIST = (
@@ -99,7 +100,7 @@ def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
         **kwargs):
     """Starts the celerybeat clock server."""
 
-    print("Celery Beat %s is starting." % __version__)
+    print("celerybeat %s is starting." % __version__)
 
     # Setup logging
     if not isinstance(loglevel, int):
@@ -109,10 +110,9 @@ def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
 
     # Dump configuration to screen so we have some basic information
     # when users sends e-mails.
+
     print(STARTUP_INFO_FMT % {
-            "vhost": getattr(settings, "AMQP_VHOST", "(default)"),
-            "host": getattr(settings, "AMQP_SERVER", "(default)"),
-            "port": getattr(settings, "AMQP_PORT", "(default)"),
+            "conninfo": get_connection_info(),
             "exchange": conf.AMQP_EXCHANGE,
             "exchange_type": conf.AMQP_EXCHANGE_TYPE,
             "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
@@ -122,9 +122,9 @@ def run_clockservice(detach=False, loglevel=conf.CELERYBEAT_LOG_LEVEL,
             "pidfile": pidfile,
     })
 
-    print("Celery Beat has started.")
+    print("celerybeat has started.")
+    from celery.log import setup_logger, redirect_stdouts_to_logger
     if detach:
-        from celery.log import setup_logger, redirect_stdouts_to_logger
         context = platform.create_daemon_context(logfile, pidfile,
                                         chroot_directory=chroot,
                                         working_directory=working_directory,

+ 4 - 12
celery/bin/celeryd.py

@@ -77,8 +77,8 @@ from celery.log import emergency_error
 from celery.task import discard_all
 from celery.worker import WorkController
 from celery.loaders import current_loader, settings
-from celery.loaders import current_loader
 from celery.loaders import settings
+from celery.messaging import get_connection_info
 
 USE_STATISTICS = getattr(settings, "CELERY_STATISTICS", False)
 # Make sure the setting exists.
@@ -86,9 +86,9 @@ settings.CELERY_STATISTICS = USE_STATISTICS
 
 STARTUP_INFO_FMT = """
 Configuration ->
-    * Broker -> %(carrot_backend)s://%(vhost)s@%(host)s:%(port)s
+    * Broker -> %(conninfo)s
     * Exchange -> %(exchange)s (%(exchange_type)s)
-    * Consumer -> Queue:%(consumer_queue)s Routing:%(consumer_rkey)s
+    * Consumer -> Queue:%(consumer_queue)s Binding:%(consumer_rkey)s
     * Concurrency -> %(concurrency)s
     * Statistics -> %(statistics)s
     * Celerybeat -> %(celerybeat)s
@@ -179,16 +179,9 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
 
     # Dump configuration to screen so we have some basic information
     # when users sends e-mails.
-    broker_connection = DjangoBrokerConnection()
-    carrot_backend = broker_connection.backend_cls
-    if carrot_backend and not isinstance(carrot_backend, str):
-        carrot_backend = carrot_backend.__name__
 
     print(STARTUP_INFO_FMT % {
-            "carrot_backend": carrot_backend or "amqp",
-            "vhost": broker_connection.virtual_host or "(default)",
-            "host": broker_connection.hostname or "(default)",
-            "port": broker_connection.port or "(port)",
+            "conninfo": get_connection_info(),
             "exchange": conf.AMQP_EXCHANGE,
             "exchange_type": conf.AMQP_EXCHANGE_TYPE,
             "consumer_queue": conf.AMQP_CONSUMER_QUEUE,
@@ -200,7 +193,6 @@ def run_worker(concurrency=conf.DAEMON_CONCURRENCY, detach=False,
             "statistics": settings.CELERY_STATISTICS and "ON" or "OFF",
             "celerybeat": run_clockservice and "ON" or "OFF",
     })
-    del(broker_connection)
 
     print("Celery has started.")
     if detach:

+ 20 - 0
celery/messaging.py

@@ -3,6 +3,7 @@
 Sending and Receiving Messages
 
 """
+from carrot.connection import DjangoBrokerConnection
 from carrot.messaging import Publisher, Consumer, ConsumerSet
 
 from celery import conf
@@ -102,3 +103,22 @@ class EventConsumer(Consumer):
     routing_key = "event"
     exchange_type = "direct"
     no_ack = True
+
+
+def get_connection_info():
+    broker_connection = DjangoBrokerConnection()
+    carrot_backend = broker_connection.backend_cls
+    if carrot_backend and not isinstance(carrot_backend, str):
+        carrot_backend = carrot_backend.__name__
+    port = broker_connection.port or \
+                broker_connection.get_backend_cls().default_port
+    port = port and ":%s" % port or ""
+    vhost = broker_connection.virtual_host
+    if not vhost.startswith("/"):
+        vhost = "/" + vhost
+    return "%(carrot_backend)s://%(userid)s@%(host)s%(port)s%(vhost)s" % {
+                "carrot_backend": carrot_backend,
+                "userid": broker_connection.userid,
+                "host": broker_connection.hostname,
+                "port": port,
+                "vhost": vhost}