graph.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery graph` command.
  4. .. program:: celery graph
  5. """
  6. from __future__ import absolute_import, unicode_literals
  7. from operator import itemgetter
  8. from celery.five import items, python_2_unicode_compatible
  9. from celery.utils.graph import DependencyGraph, GraphFormatter
  10. from .base import Command
  11. __all__ = ['graph']
  12. class graph(Command):
  13. args = """<TYPE> [arguments]
  14. ..... bootsteps [worker] [consumer]
  15. ..... workers [enumerate]
  16. """
  17. def run(self, what=None, *args, **kwargs):
  18. map = {'bootsteps': self.bootsteps, 'workers': self.workers}
  19. if not what:
  20. raise self.UsageError('missing type')
  21. elif what not in map:
  22. raise self.Error('no graph {0} in {1}'.format(what, '|'.join(map)))
  23. return map[what](*args, **kwargs)
  24. def bootsteps(self, *args, **kwargs):
  25. worker = self.app.WorkController()
  26. include = {arg.lower() for arg in args or ['worker', 'consumer']}
  27. if 'worker' in include:
  28. graph = worker.blueprint.graph
  29. if 'consumer' in include:
  30. worker.blueprint.connect_with(worker.consumer.blueprint)
  31. else:
  32. graph = worker.consumer.blueprint.graph
  33. graph.to_dot(self.stdout)
  34. def workers(self, *args, **kwargs):
  35. def simplearg(arg):
  36. return maybe_list(itemgetter(0, 2)(arg.partition(':')))
  37. def maybe_list(l, sep=','):
  38. return (l[0], l[1].split(sep) if sep in l[1] else l[1])
  39. args = dict(simplearg(arg) for arg in args)
  40. generic = 'generic' in args
  41. def generic_label(node):
  42. return '{0} ({1}://)'.format(type(node).__name__,
  43. node._label.split('://')[0])
  44. @python_2_unicode_compatible
  45. class Node(object):
  46. force_label = None
  47. scheme = {}
  48. def __init__(self, label, pos=None):
  49. self._label = label
  50. self.pos = pos
  51. def label(self):
  52. return self._label
  53. def __str__(self):
  54. return self.label()
  55. class Thread(Node):
  56. scheme = {'fillcolor': 'lightcyan4', 'fontcolor': 'yellow',
  57. 'shape': 'oval', 'fontsize': 10, 'width': 0.3,
  58. 'color': 'black'}
  59. def __init__(self, label, **kwargs):
  60. self._label = 'thr-{0}'.format(next(tids))
  61. self.real_label = label
  62. self.pos = 0
  63. class Formatter(GraphFormatter):
  64. def label(self, obj):
  65. return obj and obj.label()
  66. def node(self, obj):
  67. scheme = dict(obj.scheme) if obj.pos else obj.scheme
  68. if isinstance(obj, Thread):
  69. scheme['label'] = obj.real_label
  70. return self.draw_node(
  71. obj, dict(self.node_scheme, **scheme),
  72. )
  73. def terminal_node(self, obj):
  74. return self.draw_node(
  75. obj, dict(self.term_scheme, **obj.scheme),
  76. )
  77. def edge(self, a, b, **attrs):
  78. if isinstance(a, Thread):
  79. attrs.update(arrowhead='none', arrowtail='tee')
  80. return self.draw_edge(a, b, self.edge_scheme, attrs)
  81. def subscript(n):
  82. S = {'0': '₀', '1': '₁', '2': '₂', '3': '₃', '4': '₄',
  83. '5': '₅', '6': '₆', '7': '₇', '8': '₈', '9': '₉'}
  84. return ''.join([S[i] for i in str(n)])
  85. class Worker(Node):
  86. pass
  87. class Backend(Node):
  88. scheme = {'shape': 'folder', 'width': 2,
  89. 'height': 1, 'color': 'black',
  90. 'fillcolor': 'peachpuff3', 'color': 'peachpuff4'}
  91. def label(self):
  92. return generic_label(self) if generic else self._label
  93. class Broker(Node):
  94. scheme = {'shape': 'circle', 'fillcolor': 'cadetblue3',
  95. 'color': 'cadetblue4', 'height': 1}
  96. def label(self):
  97. return generic_label(self) if generic else self._label
  98. from itertools import count
  99. tids = count(1)
  100. Wmax = int(args.get('wmax', 4) or 0)
  101. Tmax = int(args.get('tmax', 3) or 0)
  102. def maybe_abbr(l, name, max=Wmax):
  103. size = len(l)
  104. abbr = max and size > max
  105. if 'enumerate' in args:
  106. l = ['{0}{1}'.format(name, subscript(i + 1))
  107. for i, obj in enumerate(l)]
  108. if abbr:
  109. l = l[0:max - 1] + [l[size - 1]]
  110. l[max - 2] = '{0}⎨…{1}⎬'.format(
  111. name[0], subscript(size - (max - 1)))
  112. return l
  113. try:
  114. workers = args['nodes']
  115. threads = args.get('threads') or []
  116. except KeyError:
  117. replies = self.app.control.inspect().stats()
  118. workers, threads = [], []
  119. for worker, reply in items(replies):
  120. workers.append(worker)
  121. threads.append(reply['pool']['max-concurrency'])
  122. wlen = len(workers)
  123. backend = args.get('backend', self.app.conf.result_backend)
  124. threads_for = {}
  125. workers = maybe_abbr(workers, 'Worker')
  126. if Wmax and wlen > Wmax:
  127. threads = threads[0:3] + [threads[-1]]
  128. for i, threads in enumerate(threads):
  129. threads_for[workers[i]] = maybe_abbr(
  130. list(range(int(threads))), 'P', Tmax,
  131. )
  132. broker = Broker(args.get(
  133. 'broker', self.app.connection_for_read().as_uri()))
  134. backend = Backend(backend) if backend else None
  135. graph = DependencyGraph(formatter=Formatter())
  136. graph.add_arc(broker)
  137. if backend:
  138. graph.add_arc(backend)
  139. curworker = [0]
  140. for i, worker in enumerate(workers):
  141. worker = Worker(worker, pos=i)
  142. graph.add_arc(worker)
  143. graph.add_edge(worker, broker)
  144. if backend:
  145. graph.add_edge(worker, backend)
  146. threads = threads_for.get(worker._label)
  147. if threads:
  148. for thread in threads:
  149. thread = Thread(thread)
  150. graph.add_arc(thread)
  151. graph.add_edge(thread, worker)
  152. curworker[0] += 1
  153. graph.to_dot(self.stdout)