Browse Source

Moves hostname related utils to .utils.nodenames

Ask Solem 8 years ago
parent
commit
f9380fd8a5

+ 1 - 1
celery/app/amqp.py

@@ -24,7 +24,7 @@ from kombu.utils.functional import maybe_list
 from celery import signals
 from celery.five import items, string_t
 from celery.local import try_import
-from celery.utils import anon_nodename
+from celery.utils.nodenames import anon_nodename
 from celery.utils.saferepr import saferepr
 from celery.utils.text import indent as textindent
 from celery.utils.timeutils import maybe_make_aware, to_utc

+ 2 - 1
celery/app/log.py

@@ -23,12 +23,13 @@ from kombu.utils.encoding import set_default_encoding_file
 from celery import signals
 from celery._state import get_current_task
 from celery.five import class_property, string_t
-from celery.utils import isatty, node_format
+from celery.utils import isatty
 from celery.utils.log import (
     get_logger, mlevel,
     ColorFormatter, LoggingProxy, get_multiprocessing_logger,
     reset_multiprocessing_logger,
 )
+from celery.utils.nodenames import node_format
 from celery.utils.term import colored
 
 __all__ = ['TaskFormatter', 'Logging']

+ 1 - 1
celery/app/trace.py

@@ -34,8 +34,8 @@ from celery.app import set_default_app
 from celery.app.task import Task as BaseTask, Context
 from celery.exceptions import Ignore, Reject, Retry, InvalidTaskError
 from celery.five import monotonic
-from celery.utils import gethostname
 from celery.utils.log import get_logger
+from celery.utils.nodenames import gethostname
 from celery.utils.objects import mro_lookup
 from celery.utils.saferepr import saferepr
 from celery.utils.serialization import (

+ 1 - 1
celery/bin/base.py

@@ -24,7 +24,7 @@ from celery.five import (
 from celery.platforms import EX_FAILURE, EX_OK, EX_USAGE
 from celery.utils import term
 from celery.utils import text
-from celery.utils import node_format, host_format
+from celery.utils.nodenames import node_format, host_format
 from celery.utils.imports import symbol_by_name, import_from_cwd
 
 try:

+ 1 - 1
celery/bin/celeryd_detach.py

@@ -19,8 +19,8 @@ import sys
 from optparse import OptionParser, BadOptionError
 
 from celery.platforms import EX_FAILURE, detached
-from celery.utils import default_nodename, node_format
 from celery.utils.log import get_logger
+from celery.utils.nodenames import default_nodename, node_format
 
 from celery.bin.base import daemon_options
 

+ 3 - 1
celery/bin/multi.py

@@ -114,7 +114,9 @@ from celery import VERSION_BANNER
 from celery.five import items
 from celery.platforms import Pidfile, IS_WINDOWS
 from celery.utils import term
-from celery.utils import gethostname, host_format, node_format, nodesplit
+from celery.utils.nodenames import (
+    gethostname, host_format, node_format, nodesplit,
+)
 from celery.utils.text import pluralize
 
 __all__ = ['MultiTool']

+ 1 - 1
celery/bin/worker.py

@@ -173,8 +173,8 @@ from celery.bin.base import Command, daemon_options
 from celery.bin.celeryd_detach import detached_celeryd
 from celery.five import string_t
 from celery.platforms import maybe_drop_privileges
-from celery.utils import default_nodename
 from celery.utils.log import LOG_LEVELS, mlevel
+from celery.utils.nodenames import default_nodename
 
 __all__ = ['worker', 'main']
 

+ 1 - 1
celery/contrib/migrate.py

@@ -19,7 +19,7 @@ from kombu.utils.encoding import ensure_bytes
 
 from celery.app import app_or_default
 from celery.five import python_2_unicode_compatible, string, string_t
-from celery.utils import worker_direct
+from celery.utils.nodenames import worker_direct
 
 __all__ = [
     'StopFiltering', 'State', 'republish', 'migrate_task',

+ 2 - 2
celery/events/__init__.py

@@ -22,12 +22,12 @@ from operator import itemgetter
 from kombu import Exchange, Queue, Producer
 from kombu.connection import maybe_channel
 from kombu.mixins import ConsumerMixin
-from kombu.utils import cached_property
+from kombu.utils import cached_property, uuid
 
 from celery.app import app_or_default
 from celery.five import items
-from celery.utils import anon_nodename, uuid
 from celery.utils.functional import dictfilter
+from celery.utils.nodenames import anon_nodename
 from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
 
 __all__ = ['Events', 'Event', 'EventDispatcher', 'EventReceiver']

+ 1 - 1
celery/tests/bin/test_base.py

@@ -255,7 +255,7 @@ class test_Command(AppCase):
 
     def test_host_format(self):
         cmd = MockCommand(app=self.app)
-        with patch('celery.utils.gethostname') as hn:
+        with patch('celery.utils.nodenames.gethostname') as hn:
             hn.return_value = 'blacktron.example.com'
             self.assertEqual(cmd.host_format(''), '')
             self.assertEqual(

+ 18 - 0
celery/tests/utils/test_nodenames.py

@@ -0,0 +1,18 @@
+from __future__ import absolute_import, unicode_literals
+
+from kombu import Queue
+
+from celery.utils import (
+    worker_direct,
+)
+
+from celery.tests.case import Case, Mock, patch
+
+
+class test_worker_direct(Case):
+
+    def test_returns_if_queue(self):
+        q = Queue('foo')
+        self.assertIs(worker_direct(q), q)
+
+

+ 1 - 1
celery/tests/worker/test_worker.py

@@ -31,7 +31,7 @@ from celery.worker import state
 from celery.worker.consumer import Consumer
 from celery.worker.pidbox import gPidbox
 from celery.worker.request import Request
-from celery.utils import worker_direct
+from celery.utils.nodenames import worker_direct
 from celery.utils.serialization import pickle
 from celery.utils.timer2 import Timer
 

+ 3 - 83
celery/utils/__init__.py

@@ -10,7 +10,6 @@ from __future__ import absolute_import, print_function, unicode_literals
 
 import numbers
 import os
-import socket
 import sys
 import traceback
 import datetime
@@ -18,12 +17,11 @@ import datetime
 from functools import partial
 from pprint import pprint
 
-from kombu.entity import Exchange, Queue
-
 from celery.five import WhateverIO, items, reraise, string_t
 
-from .functional import memoize
-from .text import simple_format
+from .functional import memoize  # noqa
+
+from .nodenames import worker_direct, nodename, nodesplit
 
 __all__ = ['worker_direct', 'lpmerge',
            'is_iterable', 'isatty', 'cry', 'maybe_reraise', 'strtobool',
@@ -38,37 +36,6 @@ PY3 = sys.version_info[0] == 3
 #: task to be that of ``App.main``.
 MP_MAIN_FILE = os.environ.get('MP_MAIN_FILE')
 
-#: Exchange for worker direct queues.
-WORKER_DIRECT_EXCHANGE = Exchange('C.dq2')
-
-#: Format for worker direct queue names.
-WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2'
-
-#: Separator for worker node name and hostname.
-NODENAME_SEP = '@'
-
-NODENAME_DEFAULT = 'celery'
-
-gethostname = memoize(1, Cache=dict)(socket.gethostname)
-
-
-def worker_direct(hostname):
-    """Return :class:`kombu.Queue` that is a direct route to
-    a worker by hostname.
-
-    :param hostname: The fully qualified node name of a worker
-                     (e.g. ``w1@example.com``).  If passed a
-                     :class:`kombu.Queue` instance it will simply return
-                     that instead.
-    """
-    if isinstance(hostname, Queue):
-        return hostname
-    return Queue(
-        WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),
-        WORKER_DIRECT_EXCHANGE,
-        hostname,
-    )
-
 
 def lpmerge(L, R):
     """In place left precedent dictionary merge.
@@ -217,53 +184,6 @@ def gen_task_name(app, name, module_name):
     return '.'.join(p for p in (module_name, name) if p)
 
 
-def nodename(name, hostname):
-    """Create node name from name/hostname pair."""
-    return NODENAME_SEP.join((name, hostname))
-
-
-def anon_nodename(hostname=None, prefix='gen'):
-    return nodename(''.join([prefix, str(os.getpid())]),
-                    hostname or gethostname())
-
-
-def nodesplit(nodename):
-    """Split node name into tuple of name/hostname."""
-    parts = nodename.split(NODENAME_SEP, 1)
-    if len(parts) == 1:
-        return None, parts[0]
-    return parts
-
-
-def default_nodename(hostname):
-    name, host = nodesplit(hostname or '')
-    return nodename(name or NODENAME_DEFAULT, host or gethostname())
-
-
-def node_format(s, nodename, **extra):
-    name, host = nodesplit(nodename)
-    return host_format(
-        s, host, name or NODENAME_DEFAULT, p=nodename, **extra)
-
-
-def _fmt_process_index(prefix='', default='0'):
-    from .log import current_process_index
-    index = current_process_index()
-    return '{0}{1}'.format(prefix, index) if index else default
-_fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
-
-
-def host_format(s, host=None, name=None, **extra):
-    host = host or gethostname()
-    hname, _, domain = host.partition('.')
-    name = name or hname
-    keys = dict({
-        'h': host, 'n': name, 'd': domain,
-        'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix,
-    }, **extra)
-    return simple_format(s, keys)
-
-
 # ------------------------------------------------------------------------ #
 # > XXX Compat
 from .log import LOG_LEVELS     # noqa

+ 103 - 0
celery/utils/nodenames.py

@@ -0,0 +1,103 @@
+# -*- coding: utf-8 -*-
+"""
+    celery.utils.nodenames
+    ~~~~~~~~~~~~~~~~~~~~~~
+
+    Worker name utilities.
+
+"""
+from __future__ import absolute_import, unicode_literals
+
+import os
+import socket
+
+from functools import partial
+
+from kombu.entity import Exchange, Queue
+
+from .functional import memoize
+from .text import simple_format
+
+#: Exchange for worker direct queues.
+WORKER_DIRECT_EXCHANGE = Exchange('C.dq2')
+
+#: Format for worker direct queue names.
+WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2'
+
+#: Separator for worker node name and hostname.
+NODENAME_SEP = '@'
+
+NODENAME_DEFAULT = 'celery'
+
+gethostname = memoize(1, Cache=dict)(socket.gethostname)
+
+__all__ = [
+    'worker_direct', 'gethostname', 'nodename',
+    'anon_nodename', 'nodesplit', 'default_nodename',
+    'node_format', 'host_format',
+]
+
+
+def worker_direct(hostname):
+    """Return :class:`kombu.Queue` that is a direct route to
+    a worker by hostname.
+
+    :param hostname: The fully qualified node name of a worker
+                     (e.g. ``w1@example.com``).  If passed a
+                     :class:`kombu.Queue` instance it will simply return
+                     that instead.
+    """
+    if isinstance(hostname, Queue):
+        return hostname
+    return Queue(
+        WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),
+        WORKER_DIRECT_EXCHANGE,
+        hostname,
+    )
+
+
+def nodename(name, hostname):
+    """Create node name from name/hostname pair."""
+    return NODENAME_SEP.join((name, hostname))
+
+
+def anon_nodename(hostname=None, prefix='gen'):
+    return nodename(''.join([prefix, str(os.getpid())]),
+                    hostname or gethostname())
+
+
+def nodesplit(nodename):
+    """Split node name into tuple of name/hostname."""
+    parts = nodename.split(NODENAME_SEP, 1)
+    if len(parts) == 1:
+        return None, parts[0]
+    return parts
+
+
+def default_nodename(hostname):
+    name, host = nodesplit(hostname or '')
+    return nodename(name or NODENAME_DEFAULT, host or gethostname())
+
+
+def node_format(s, nodename, **extra):
+    name, host = nodesplit(nodename)
+    return host_format(
+        s, host, name or NODENAME_DEFAULT, p=nodename, **extra)
+
+
+def _fmt_process_index(prefix='', default='0'):
+    from .log import current_process_index
+    index = current_process_index()
+    return '{0}{1}'.format(prefix, index) if index else default
+_fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
+
+
+def host_format(s, host=None, name=None, **extra):
+    host = host or gethostname()
+    hname, _, domain = host.partition('.')
+    name = name or hname
+    keys = dict({
+        'h': host, 'n': name, 'd': domain,
+        'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix,
+    }, **extra)
+    return simple_format(s, keys)

+ 1 - 1
celery/worker/__init__.py

@@ -32,9 +32,9 @@ from celery.exceptions import (
 )
 from celery.five import python_2_unicode_compatible, values
 from celery.platforms import EX_FAILURE, create_pidlock
-from celery.utils import default_nodename, worker_direct
 from celery.utils.imports import reload_from_cwd
 from celery.utils.log import mlevel, worker_logger as logger
+from celery.utils.nodenames import default_nodename, worker_direct
 from celery.utils.text import str_to_list
 from celery.utils.threads import default_socket_timeout
 

+ 1 - 1
celery/worker/consumer/consumer.py

@@ -30,9 +30,9 @@ from celery import signals
 from celery.app.trace import build_tracer
 from celery.exceptions import InvalidTaskError, NotRegistered
 from celery.five import buffer_t, items, python_2_unicode_compatible, values
-from celery.utils import gethostname
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
+from celery.utils.nodenames import gethostname
 from celery.utils.objects import Bunch
 from celery.utils.text import truncate
 from celery.utils.timeutils import humanize_seconds, rate

+ 2 - 1
celery/worker/request.py

@@ -16,6 +16,7 @@ from datetime import datetime
 from weakref import ref
 
 from billiard.common import TERM_SIGNAME
+from kombu.utils import cached_property
 from kombu.utils.encoding import safe_repr, safe_str
 
 from celery import signals
@@ -27,9 +28,9 @@ from celery.exceptions import (
 )
 from celery.five import python_2_unicode_compatible, string
 from celery.platforms import signals as _signals
-from celery.utils import cached_property, gethostname
 from celery.utils.functional import noop
 from celery.utils.log import get_logger
+from celery.utils.nodenames import gethostname
 from celery.utils.timeutils import maybe_iso8601, timezone, maybe_make_aware
 from celery.utils.serialization import get_pickled_exception
 

+ 11 - 0
docs/internals/reference/celery.utils.nodenames.rst

@@ -0,0 +1,11 @@
+==========================================
+ ``celery.utils.nodenames``
+==========================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.utils.nodenames
+
+.. automodule:: celery.utils.nodenames
+    :members:
+    :undoc-members:

+ 1 - 0
docs/internals/reference/index.rst

@@ -51,6 +51,7 @@
     celery.utils
     celery.utils.abstract
     celery.utils.collections
+    celery.utils.nodenames
     celery.utils.deprecated
     celery.utils.functional
     celery.utils.graph