Browse Source

Nicer layout for 'celery graph workers'

Ask Solem 12 years ago
parent
commit
dad185cb3c
1 changed files with 98 additions and 22 deletions
  1. 98 22
      celery/bin/celery.py

+ 98 - 22
celery/bin/celery.py

@@ -6,7 +6,7 @@ The :program:`celery` umbrella command.
 .. program:: celery
 
 """
-from __future__ import absolute_import, print_function
+from __future__ import absolute_import, print_function, unicode_literals
 
 import anyjson
 import heapq
@@ -16,6 +16,7 @@ import warnings
 
 from importlib import import_module
 from itertools import imap
+from operator import itemgetter
 from pprint import pformat
 
 from celery.datastructures import DependencyGraph, GraphFormatter
@@ -492,21 +493,55 @@ class graph(Command):
         graph.to_dot(self.stdout)
 
     def workers(self, *args, **kwargs):
-        args = set(arg.lower() for arg in args)
+
+        def simplearg(arg):
+            return maybe_list(itemgetter(0, 2)(arg.partition(':')))
+
+        def maybe_list(l, sep=','):
+            return (l[0], l[1].split(sep) if sep in l[1] else l[1])
+
+        args = dict(map(simplearg, args))
         generic = 'generic' in args
 
         def generic_label(node):
             return '{0} ({1}://)'.format(type(node).__name__,
                                          node._label.split('://')[0])
 
+        class Node(object):
+            force_label = None
+            scheme = {}
+
+            def __init__(self, label, pos=None):
+                self._label = label
+                self.pos = pos
+
+            def label(self):
+                return self._label
+
+            def __str__(self):
+                return self.label()
+
+        class Thread(Node):
+            scheme = {'fillcolor': 'lightcyan4', 'fontcolor': 'yellow',
+                      'shape':'oval', 'fontsize': 10, 'width': 0.3,
+                      'color': 'black'}
+
+            def __init__(self, label, **kwargs):
+                self._label = 'thr-{0}'.format(next(tids))
+                self.real_label = label
+                self.pos = 0
+
         class Formatter(GraphFormatter):
 
             def label(self, obj):
                 return obj and obj.label()
 
             def node(self, obj):
+                scheme = dict(obj.scheme, sortv=obj.pos) if obj.pos else obj.scheme
+                if isinstance(obj, Thread):
+                    scheme['label'] = obj.real_label
                 return self.draw_node(
-                    obj, dict(self.node_scheme, **obj.scheme),
+                    obj, dict(self.node_scheme, **scheme),
                 )
 
             def terminal_node(self, obj):
@@ -514,53 +549,94 @@ class graph(Command):
                     obj, dict(self.term_scheme, **obj.scheme),
                 )
 
-        class Node(object):
-            force_label = None
-            scheme = {}
+            def edge(self, a, b, **attrs):
+                if isinstance(a, Thread):
+                    attrs.update(arrowhead='none', arrowtail='tee')
+                return self.draw_edge(a, b, self.edge_scheme, attrs)
 
-            def __init__(self, label):
-                self._label = label
+        def subscript(n):
+            S = {'0': '₀', '1': '₁', '2': '₂', '3': '₃', '4': '₄',
+                 '5': '₅', '6': '₆', '7': '₇', '8': '₈', '9': '₉'}
+            return ''.join([S[i] for i in str(n)])
 
-            def label(self):
-                return self._label
 
         class Worker(Node):
             pass
 
         class Backend(Node):
             scheme = {'shape': 'folder', 'width': 2,
-                      'height': 2, 'color': 'black'}
+                      'height': 1, 'color': 'black',
+                      'fillcolor': 'peachpuff3', 'color': 'peachpuff4'}
 
             def label(self):
                 return generic_label(self) if generic else self._label
 
         class Broker(Node):
             scheme = {'shape': 'circle', 'fillcolor': 'cadetblue3',
-                      'color': 'cadetblue4', 'height': 2}
+                      'color': 'cadetblue4', 'height': 1}
 
             def label(self):
                 return generic_label(self) if generic else self._label
 
-        backend = self.app.conf.CELERY_RESULT_BACKEND
-        pongs = self.app.control.ping()
-        workers = [pong.keys()[0] for pong in pongs]
-        wfmt = 'Worker{0}' if len(workers) < 6 else 'W{0}'
-        if 'enumerate' in args:
-            workers = [wfmt.format(i + 1) for i, _ in enumerate(workers)]
+        from itertools import count
+        tids = count(1)
+        Wmax = int(args.get('wmax', 4) or 0)
+        Tmax = int(args.get('tmax', 3) or 0)
+
+        def maybe_abbr(l, name, max=Wmax):
+            size = len(l)
+            abbr = max and size > max
+            if 'enumerate' in args:
+                l = ['{0}{1}'.format(name, subscript(i + 1))
+                        for i, obj in enumerate(l)]
+            if abbr:
+                l = l[0:max -1] + [l[size - 1]]
+                l[max - 2] = '{0}⎨…{1}⎬'.format(
+                    name[0], subscript(size - (max - 1)))
+            return l
 
-        workers = [Worker(worker) for worker in workers]
-        broker = Broker(self.app.connection().as_uri())
+        try:
+            workers = args['nodes']
+            threads = args.get('threads') or []
+        except KeyError:
+            replies = self.app.control.inspect().stats()
+            workers, threads = [], []
+            for worker, reply in replies.iteritems():
+                workers.append(worker)
+                threads.append(reply['pool']['max-concurrency'])
+
+        wlen = len(workers)
+        backend = args.get('backend', self.app.conf.CELERY_RESULT_BACKEND)
+        threads_for = {}
+        workers = maybe_abbr(workers, 'Worker')
+        if Wmax and wlen > Wmax:
+            threads = threads[0:3] + [threads[-1]]
+        for i, threads in enumerate(threads):
+            threads_for[workers[i]] = maybe_abbr(
+                range(int(threads)), 'P', Tmax,
+            )
+
+        broker = Broker(args.get('broker', self.app.connection().as_uri()))
         backend = Backend(backend) if backend else None
-
         graph = DependencyGraph(formatter=Formatter())
         graph.add_arc(broker)
         if backend:
             graph.add_arc(backend)
-        for worker in workers:
+        curworker = [0]
+        for i, worker in enumerate(workers):
+            worker = Worker(worker, pos=i)
             graph.add_arc(worker)
             graph.add_edge(worker, broker)
             if backend:
                 graph.add_edge(worker, backend)
+            threads = threads_for.get(worker._label)
+            if threads:
+                for thread in threads:
+                    thread = Thread(thread)
+                    graph.add_arc(thread)
+                    graph.add_edge(thread, worker)
+
+            curworker[0] += 1
 
         graph.to_dot(self.stdout)