graph.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. # -*- coding: utf-8 -*-
  2. """The :program:`celery graph` command.
  3. .. program:: celery graph
  4. """
  5. from __future__ import absolute_import, unicode_literals
  6. from operator import itemgetter
  7. from celery.utils.graph import DependencyGraph, GraphFormatter
  8. from .base import Command
  9. __all__ = ['graph']
  10. class graph(Command):
  11. args = """<TYPE> [arguments]
  12. ..... bootsteps [worker] [consumer]
  13. ..... workers [enumerate]
  14. """
  15. def run(self, what=None, *args, **kwargs):
  16. map = {'bootsteps': self.bootsteps, 'workers': self.workers}
  17. if not what:
  18. raise self.UsageError('missing type')
  19. elif what not in map:
  20. raise self.Error('no graph {0} in {1}'.format(what, '|'.join(map)))
  21. return map[what](*args, **kwargs)
  22. def bootsteps(self, *args, **kwargs):
  23. worker = self.app.WorkController()
  24. include = {arg.lower() for arg in args or ['worker', 'consumer']}
  25. if 'worker' in include:
  26. graph = worker.blueprint.graph
  27. if 'consumer' in include:
  28. worker.blueprint.connect_with(worker.consumer.blueprint)
  29. else:
  30. graph = worker.consumer.blueprint.graph
  31. graph.to_dot(self.stdout)
  32. def workers(self, *args, **kwargs):
  33. def simplearg(arg):
  34. return maybe_list(itemgetter(0, 2)(arg.partition(':')))
  35. def maybe_list(l, sep=','):
  36. return (l[0], l[1].split(sep) if sep in l[1] else l[1])
  37. args = dict(simplearg(arg) for arg in args)
  38. generic = 'generic' in args
  39. def generic_label(node):
  40. return '{0} ({1}://)'.format(type(node).__name__,
  41. node._label.split('://')[0])
  42. class Node:
  43. force_label = None
  44. scheme = {}
  45. def __init__(self, label, pos=None):
  46. self._label = label
  47. self.pos = pos
  48. def label(self):
  49. return self._label
  50. def __str__(self):
  51. return self.label()
  52. class Thread(Node):
  53. scheme = {'fillcolor': 'lightcyan4', 'fontcolor': 'yellow',
  54. 'shape': 'oval', 'fontsize': 10, 'width': 0.3,
  55. 'color': 'black'}
  56. def __init__(self, label, **kwargs):
  57. self._label = 'thr-{0}'.format(next(tids))
  58. self.real_label = label
  59. self.pos = 0
  60. class Formatter(GraphFormatter):
  61. def label(self, obj):
  62. return obj and obj.label()
  63. def node(self, obj):
  64. scheme = dict(obj.scheme) if obj.pos else obj.scheme
  65. if isinstance(obj, Thread):
  66. scheme['label'] = obj.real_label
  67. return self.draw_node(
  68. obj, dict(self.node_scheme, **scheme),
  69. )
  70. def terminal_node(self, obj):
  71. return self.draw_node(
  72. obj, dict(self.term_scheme, **obj.scheme),
  73. )
  74. def edge(self, a, b, **attrs):
  75. if isinstance(a, Thread):
  76. attrs.update(arrowhead='none', arrowtail='tee')
  77. return self.draw_edge(a, b, self.edge_scheme, attrs)
  78. def subscript(n):
  79. S = {'0': '₀', '1': '₁', '2': '₂', '3': '₃', '4': '₄',
  80. '5': '₅', '6': '₆', '7': '₇', '8': '₈', '9': '₉'}
  81. return ''.join([S[i] for i in str(n)])
  82. class Worker(Node):
  83. pass
  84. class Backend(Node):
  85. scheme = {'shape': 'folder', 'width': 2,
  86. 'height': 1, 'color': 'black',
  87. 'fillcolor': 'peachpuff3', 'color': 'peachpuff4'}
  88. def label(self):
  89. return generic_label(self) if generic else self._label
  90. class Broker(Node):
  91. scheme = {'shape': 'circle', 'fillcolor': 'cadetblue3',
  92. 'color': 'cadetblue4', 'height': 1}
  93. def label(self):
  94. return generic_label(self) if generic else self._label
  95. from itertools import count
  96. tids = count(1)
  97. Wmax = int(args.get('wmax', 4) or 0)
  98. Tmax = int(args.get('tmax', 3) or 0)
  99. def maybe_abbr(l, name, max=Wmax):
  100. size = len(l)
  101. abbr = max and size > max
  102. if 'enumerate' in args:
  103. l = ['{0}{1}'.format(name, subscript(i + 1))
  104. for i, obj in enumerate(l)]
  105. if abbr:
  106. l = l[0:max - 1] + [l[size - 1]]
  107. l[max - 2] = '{0}⎨…{1}⎬'.format(
  108. name[0], subscript(size - (max - 1)))
  109. return l
  110. try:
  111. workers = args['nodes']
  112. threads = args.get('threads') or []
  113. except KeyError:
  114. replies = self.app.control.inspect().stats()
  115. workers, threads = [], []
  116. for worker, reply in replies.items():
  117. workers.append(worker)
  118. threads.append(reply['pool']['max-concurrency'])
  119. wlen = len(workers)
  120. backend = args.get('backend', self.app.conf.result_backend)
  121. threads_for = {}
  122. workers = maybe_abbr(workers, 'Worker')
  123. if Wmax and wlen > Wmax:
  124. threads = threads[0:3] + [threads[-1]]
  125. for i, threads in enumerate(threads):
  126. threads_for[workers[i]] = maybe_abbr(
  127. list(range(int(threads))), 'P', Tmax,
  128. )
  129. broker = Broker(args.get(
  130. 'broker', self.app.connection_for_read().as_uri()))
  131. backend = Backend(backend) if backend else None
  132. graph = DependencyGraph(formatter=Formatter())
  133. graph.add_arc(broker)
  134. if backend:
  135. graph.add_arc(backend)
  136. curworker = [0]
  137. for i, worker in enumerate(workers):
  138. worker = Worker(worker, pos=i)
  139. graph.add_arc(worker)
  140. graph.add_edge(worker, broker)
  141. if backend:
  142. graph.add_edge(worker, backend)
  143. threads = threads_for.get(worker._label)
  144. if threads:
  145. for thread in threads:
  146. thread = Thread(thread)
  147. graph.add_arc(thread)
  148. graph.add_edge(thread, worker)
  149. curworker[0] += 1
  150. graph.to_dot(self.stdout)