Browse Source

Worker nodenames now consists of name and hostname separated by @

Ask Solem 12 years ago
parent
commit
a9f9fc75a3
3 changed files with 23 additions and 4 deletions
  1. 2 2
      celery/apps/worker.py
  2. 14 0
      celery/utils/__init__.py
  3. 7 2
      celery/worker/__init__.py

+ 2 - 2
celery/apps/worker.py

@@ -62,7 +62,7 @@ ARTLINES = [
 ]
 
 BANNER = """\
-celery@{hostname} v{version}
+{hostname} v{version}
 
 [Configuration]
 . broker:      {conninfo}
@@ -133,7 +133,7 @@ class Worker(WorkController):
 
     def on_consumer_ready(self, consumer):
         signals.worker_ready.send(sender=consumer)
-        print('celery@{0.hostname} ready.'.format(self))
+        print('{0.hostname} ready.'.format(self))
 
     def setup_logging(self, colorize=None):
         if colorize is None and self.no_color is not None:

+ 14 - 0
celery/utils/__init__.py

@@ -9,6 +9,7 @@
 from __future__ import absolute_import, print_function
 
 import os
+import socket
 import sys
 import traceback
 import warnings
@@ -49,6 +50,8 @@ WORKER_DIRECT_EXCHANGE = Exchange('C.dq')
 #: Format for worker direct queue names.
 WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq'
 
+NODENAME_SEP = '@'
+
 
 def worker_direct(hostname):
     if isinstance(hostname, Queue):
@@ -248,6 +251,17 @@ def gen_task_name(app, name, module_name):
     return '.'.join(filter(None, [module_name, name]))
 
 
+def nodename(name, hostname):
+    return NODENAME_SEP.join((name, hostname))
+
+
+def nodesplit(nodename):
+    parts = nodename.split(NODENAME_SEP, 1)
+    if len(parts) == 1:
+        return None, parts[0]
+    return parts
+
+
 # ------------------------------------------------------------------------ #
 # > XXX Compat
 from .log import LOG_LEVELS     # noqa

+ 7 - 2
celery/worker/__init__.py

@@ -28,7 +28,7 @@ from celery.app.abstract import configurated, from_config
 from celery.exceptions import (
     ImproperlyConfigured, SystemTerminate, TaskRevokedError,
 )
-from celery.utils import worker_direct
+from celery.utils import nodename, nodesplit, worker_direct
 from celery.utils.imports import reload_from_cwd
 from celery.utils.log import mlevel, worker_logger as logger
 
@@ -43,6 +43,11 @@ enable the CELERY_CREATE_MISSING_QUEUES setting.
 """
 
 
+def default_nodename(hostname):
+    name, host = nodesplit(hostname or '')
+    return nodename(name or 'celery', host or socket.gethostname())
+
+
 class WorkController(configurated):
     """Unmanaged worker instance."""
     app = None
@@ -97,7 +102,7 @@ class WorkController(configurated):
 
     def __init__(self, app=None, hostname=None, **kwargs):
         self.app = app_or_default(app or self.app)
-        self.hostname = hostname or socket.gethostname()
+        self.hostname = default_nodename(hostname)
         self.app.loader.init_worker()
         self.on_before_init(**kwargs)