|
@@ -18,6 +18,7 @@ from importlib import import_module
|
|
|
from itertools import imap
|
|
|
from pprint import pformat
|
|
|
|
|
|
+from celery.datastructures import DependencyGraph, GraphFormatter
|
|
|
from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
|
|
|
from celery.utils import term
|
|
|
from celery.utils import text
|
|
@@ -467,10 +468,13 @@ class result(Command):
|
|
|
|
|
|
@command
|
|
|
class graph(Command):
|
|
|
- args = '<TYPE> [arguments]\n..... bootsteps [worker] [consumer]'
|
|
|
+ args = """<TYPE> [arguments]
|
|
|
+ ..... bootsteps [worker] [consumer]
|
|
|
+ ..... workers [enumerate]
|
|
|
+ """
|
|
|
|
|
|
def run(self, what=None, *args, **kwargs):
|
|
|
- map = {'bootsteps': self.bootsteps}
|
|
|
+ map = {'bootsteps': self.bootsteps, 'workers': self.workers}
|
|
|
not what and self.exit_help('graph')
|
|
|
if what not in map:
|
|
|
raise Error('no graph {0} in {1}'.format(what, '|'.join(map)))
|
|
@@ -487,6 +491,79 @@ class graph(Command):
|
|
|
graph = worker.consumer.namespace.graph
|
|
|
graph.to_dot(self.stdout)
|
|
|
|
|
|
+ def workers(self, *args, **kwargs):
|
|
|
+ args = set(arg.lower() for arg in args)
|
|
|
+ generic = 'generic' in args
|
|
|
+
|
|
|
+ def generic_label(node):
|
|
|
+ return '{0} ({1}://)'.format(type(node).__name__,
|
|
|
+ node._label.split('://')[0])
|
|
|
+
|
|
|
+ class Formatter(GraphFormatter):
|
|
|
+
|
|
|
+ def label(self, obj):
|
|
|
+ return obj and obj.label()
|
|
|
+
|
|
|
+ def node(self, obj):
|
|
|
+ return self.draw_node(
|
|
|
+ obj, dict(self.node_scheme, **obj.scheme),
|
|
|
+ )
|
|
|
+
|
|
|
+ def terminal_node(self, obj):
|
|
|
+ return self.draw_node(
|
|
|
+ obj, dict(self.term_scheme, **obj.scheme),
|
|
|
+ )
|
|
|
+
|
|
|
+ class Node(object):
|
|
|
+ force_label = None
|
|
|
+ scheme = {}
|
|
|
+
|
|
|
+ def __init__(self, label):
|
|
|
+ self._label = label
|
|
|
+
|
|
|
+ def label(self):
|
|
|
+ return self._label
|
|
|
+
|
|
|
+ class Worker(Node):
|
|
|
+ pass
|
|
|
+
|
|
|
+ class Backend(Node):
|
|
|
+ scheme = {'shape': 'folder', 'width': 2,
|
|
|
+ 'height': 2, 'color': 'black'}
|
|
|
+
|
|
|
+ def label(self):
|
|
|
+ return generic_label(self) if generic else self._label
|
|
|
+
|
|
|
+ class Broker(Node):
|
|
|
+ scheme = {'shape': 'circle', 'fillcolor': 'cadetblue3',
|
|
|
+ 'color': 'cadetblue4', 'height': 2}
|
|
|
+
|
|
|
+ def label(self):
|
|
|
+ return generic_label(self) if generic else self._label
|
|
|
+
|
|
|
+ backend = self.app.conf.CELERY_RESULT_BACKEND
|
|
|
+ pongs = self.app.control.ping()
|
|
|
+ workers = [pong.keys()[0] for pong in pongs]
|
|
|
+ wfmt = 'Worker{0}' if len(workers) < 6 else 'W{0}'
|
|
|
+ if 'enumerate' in args:
|
|
|
+ workers = [wfmt.format(i + 1) for i, _ in enumerate(workers)]
|
|
|
+
|
|
|
+ workers = [Worker(worker) for worker in workers]
|
|
|
+ broker = Broker(self.app.connection().as_uri())
|
|
|
+ backend = Backend(backend) if backend else None
|
|
|
+
|
|
|
+ graph = DependencyGraph(formatter=Formatter())
|
|
|
+ graph.add_arc(broker)
|
|
|
+ if backend:
|
|
|
+ graph.add_arc(backend)
|
|
|
+ for worker in workers:
|
|
|
+ graph.add_arc(worker)
|
|
|
+ graph.add_edge(worker, broker)
|
|
|
+ if backend:
|
|
|
+ graph.add_edge(worker, backend)
|
|
|
+
|
|
|
+ graph.to_dot(self.stdout)
|
|
|
+
|
|
|
|
|
|
class _RemoteControl(Command):
|
|
|
name = None
|