nodenames.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding: utf-8 -*-
  2. """Worker name utilities."""
  3. from __future__ import absolute_import, unicode_literals
  4. import os
  5. import socket
  6. from functools import partial
  7. from kombu.entity import Exchange, Queue
  8. from .functional import memoize
  9. from .text import simple_format
  10. #: Exchange for worker direct queues.
  11. WORKER_DIRECT_EXCHANGE = Exchange('C.dq2')
  12. #: Format for worker direct queue names.
  13. WORKER_DIRECT_QUEUE_FORMAT = '{hostname}.dq2'
  14. #: Separator for worker node name and hostname.
  15. NODENAME_SEP = '@'
  16. NODENAME_DEFAULT = 'celery'
  17. gethostname = memoize(1, Cache=dict)(socket.gethostname)
  18. __all__ = [
  19. 'worker_direct', 'gethostname', 'nodename',
  20. 'anon_nodename', 'nodesplit', 'default_nodename',
  21. 'node_format', 'host_format',
  22. ]
  23. def worker_direct(hostname):
  24. """Return the :class:`kombu.Queue` being a direct route to a worker.
  25. Arguments:
  26. hostname (str, ~kombu.Queue): The fully qualified node name of
  27. a worker (e.g., ``w1@example.com``). If passed a
  28. :class:`kombu.Queue` instance it will simply return
  29. that instead.
  30. """
  31. if isinstance(hostname, Queue):
  32. return hostname
  33. return Queue(
  34. WORKER_DIRECT_QUEUE_FORMAT.format(hostname=hostname),
  35. WORKER_DIRECT_EXCHANGE,
  36. hostname,
  37. )
  38. def nodename(name, hostname):
  39. """Create node name from name/hostname pair."""
  40. return NODENAME_SEP.join((name, hostname))
  41. def anon_nodename(hostname=None, prefix='gen'):
  42. """Return the nodename for this process (not a worker).
  43. This is used for e.g. the origin task message field.
  44. """
  45. return nodename(''.join([prefix, str(os.getpid())]),
  46. hostname or gethostname())
  47. def nodesplit(name):
  48. """Split node name into tuple of name/hostname."""
  49. parts = name.split(NODENAME_SEP, 1)
  50. if len(parts) == 1:
  51. return None, parts[0]
  52. return parts
  53. def default_nodename(hostname):
  54. """Return the default nodename for this process."""
  55. name, host = nodesplit(hostname or '')
  56. return nodename(name or NODENAME_DEFAULT, host or gethostname())
  57. def node_format(s, name, **extra):
  58. """Format worker node name (name@host.com)."""
  59. shortname, host = nodesplit(name)
  60. return host_format(
  61. s, host, shortname or NODENAME_DEFAULT, p=name, **extra)
  62. def _fmt_process_index(prefix='', default='0'):
  63. from .log import current_process_index
  64. index = current_process_index()
  65. return '{0}{1}'.format(prefix, index) if index else default
  66. _fmt_process_index_with_prefix = partial(_fmt_process_index, '-', '')
  67. def host_format(s, host=None, name=None, **extra):
  68. """Format host %x abbreviations."""
  69. host = host or gethostname()
  70. hname, _, domain = host.partition('.')
  71. name = name or hname
  72. keys = dict({
  73. 'h': host, 'n': name, 'd': domain,
  74. 'i': _fmt_process_index, 'I': _fmt_process_index_with_prefix,
  75. }, **extra)
  76. return simple_format(s, keys)