nodenames.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding: utf-8 -*-
  2. """Worker name utilities."""
  3. import os
  4. import socket
  5. from functools import partial
  6. from typing import Dict, Optional, Tuple
  7. from kombu.entity import Exchange, Queue
  8. from .functional import memoize
  9. from .text import simple_format
  10. #: Exchange for worker direct queues.
  11. WORKER_DIRECT_EXCHANGE = Exchange('C.dq2')
  12. #: Format for worker direct queue names.
  13. WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2'
  14. #: Separator for worker node name and hostname.
  15. NODENAME_SEP = '@'
  16. NODENAME_DEFAULT = 'celery'
  17. gethostname = memoize(1, Cache=dict)(socket.gethostname)
  18. __all__ = [
  19. 'worker_direct', 'gethostname', 'nodename',
  20. 'anon_nodename', 'nodesplit', 'default_nodename',
  21. 'node_format', 'host_format',
  22. ]
  23. def worker_direct(hostname: str) -> Queue:
  24. """Return the :class:`kombu.Queue` being a direct route to a worker.
  25. Arguments:
  26. hostname (str, ~kombu.Queue): The fully qualified node name of
  27. a worker (e.g., ``w1@example.com``). If passed a
  28. :class:`kombu.Queue` instance it will simply return
  29. that instead.
  30. """
  31. if isinstance(hostname, Queue):
  32. return hostname
  33. return Queue(
  34. WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),
  35. WORKER_DIRECT_EXCHANGE,
  36. hostname,
  37. )
  38. def nodename(name: str, hostname: str) -> str:
  39. """Create node name from name/hostname pair."""
  40. return NODENAME_SEP.join((name, hostname))
  41. def anon_nodename(hostname: Optional[str]=None, prefix: str='gen') -> str:
  42. """Return the nodename for this process (not a worker).
  43. This is used for e.g. the origin task message field.
  44. """
  45. return nodename(''.join([prefix, str(os.getpid())]),
  46. hostname or gethostname())
  47. def nodesplit(name: str) -> Tuple[str]:
  48. """Split node name into tuple of name/hostname."""
  49. parts = name.split(NODENAME_SEP, 1)
  50. if len(parts) == 1:
  51. return None, parts[0]
  52. return parts
  53. def default_nodename(hostname: str) -> str:
  54. """Return the default nodename for this process."""
  55. name, host = nodesplit(hostname or '')
  56. return nodename(name or NODENAME_DEFAULT, host or gethostname())
  57. def node_format(s: str, name: str, **extra: Dict[str, str]) -> str:
  58. """Format worker node name (name@host.com)."""
  59. shortname, host = nodesplit(name)
  60. return host_format(
  61. s, host, shortname or NODENAME_DEFAULT, p=name, **extra)
  62. def _fmt_process_index(prefix: str='', default: str='0') -> str:
  63. from .log import current_process_index
  64. index = current_process_index()
  65. return '{0}{1}'.format(prefix, index) if index else default
  66. _fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
  67. def host_format(s: str,
  68. host: Optional[str]=None, name: Optional[str]=None,
  69. **extra: Dict[str, str]) -> str:
  70. """Format host %x abbreviations."""
  71. host = host or gethostname()
  72. hname, _, domain = host.partition('.')
  73. name = name or hname
  74. keys = dict({
  75. 'h': host, 'n': name, 'd': domain,
  76. 'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix,
  77. }, **extra)
  78. return simple_format(s, keys)