Browse Source

New 'celery graph' command replaces worker_graph|consumer_graph. E.g.: celery graph bootsteps worker consumer, connects both or you can use only one

Ask Solem 12 years ago
parent
commit
d7cf4cde99

+ 26 - 21
celery/bin/celery.py

@@ -18,8 +18,6 @@ from importlib import import_module
 from itertools import imap
 from pprint import pformat
 
-from kombu.utils.encoding import safe_str
-
 from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
 from celery.utils import term
 from celery.utils import text
@@ -54,7 +52,7 @@ command_classes = [
 ]
 if DEBUG:
     command_classes.append(
-        ('Debug', ['worker_graph', 'consumer_graph'], 'red'),
+        ('Debug', ['graph'], 'red'),
     )
 
 
@@ -129,14 +127,14 @@ class Command(BaseCommand):
         try:
             ret = self.run(*args, **kwargs)
         except Error as exc:
-            self.error(self.colored.red('Error: {0!r}'.format(exc)))
+            self.error(self.colored.red('Error: {0}'.format(exc)))
             return exc.status
 
         return ret if ret is not None else EX_OK
 
-    def show_help(self, command):
-        self.run_from_argv(self.prog_name, [command, '--help'])
-        return EX_USAGE
+    def exit_help(self, command):
+        # this never exits due to OptionParser.parse_options
+        return self.run_from_argv(self.prog_name, [command, '--help'])
 
     def error(self, s):
         self.out(s, fh=self.stderr)
@@ -157,7 +155,7 @@ class Command(BaseCommand):
         return self(*args, **options)
 
     def usage(self, command):
-        return '%%prog {0} [options] {self.args}'.format(command, self=self)
+        return '%prog {0} [options] {self.args}'.format(command, self=self)
 
     def prettify_list(self, n):
         c = self.colored
@@ -468,19 +466,26 @@ class result(Command):
 
 
 @command
-class worker_graph(Command):
-
-    def run(self, **kwargs):
-        worker = self.app.WorkController()
-        worker.namespace.graph.to_dot(self.stdout)
+class graph(Command):
+    args = '<TYPE> [arguments]\n.....  bootsteps [worker] [consumer]'
 
+    def run(self, what=None, *args, **kwargs):
+        map = {'bootsteps': self.bootsteps}
+        not what and self.exit_help('graph')
+        if what not in map:
+            raise Error('no graph {0} in {1}'.format(what, '|'.join(map)))
+        return map[what](*args, **kwargs)
 
-@command
-class consumer_graph(Command):
-
-    def run(self, **kwargs):
+    def bootsteps(self, *args, **kwargs):
         worker = self.app.WorkController()
-        worker.consumer.namespace.graph.to_dot(self.stdout)
+        include = set(arg.lower() for arg in args or ['worker', 'consumer'])
+        if 'worker' in include:
+            graph = worker.namespace.graph
+            if 'consumer' in include:
+                worker.namespace.connect_with(worker.consumer.namespace)
+        else:
+            graph = worker.consumer.namespace.graph
+        graph.to_dot(self.stdout)
 
 
 class _RemoteControl(Command):
@@ -528,7 +533,7 @@ class _RemoteControl(Command):
         ])
 
     def usage(self, command):
-        return '%%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
+        return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
                 command, self.args)
 
     def call(self, *args, **kwargs):
@@ -726,7 +731,7 @@ class migrate(Command):
 
     def run(self, *args, **kwargs):
         if len(args) != 2:
-            return self.show_help('migrate')
+            return self.exit_help('migrate')
         from kombu import Connection
         from celery.contrib.migrate import migrate_tasks
 
@@ -854,7 +859,7 @@ class help(Command):
     """Show help screen and exit."""
 
     def usage(self, command):
-        return '%%prog <command> [options] {0.args}'.format(self)
+        return '%prog <command> [options] {0.args}'.format(self)
 
     def run(self, *args, **kwargs):
         self.parser.print_help()

+ 4 - 0
celery/bootsteps.py

@@ -187,6 +187,10 @@ class Namespace(object):
             step.include(parent)
         return self
 
+    def connect_with(self, other):
+        self.graph.adjacent.update(other.graph.adjacent)
+        self.graph.add_edge(type(other.order[0]), type(self.order[-1]))
+
     def import_module(self, module):
         return import_module(module)
 

+ 12 - 5
celery/datastructures.py

@@ -74,7 +74,9 @@ class GraphFormatter(object):
 
     def attrs(self, d, scheme=None):
         d = dict(self.scheme, **dict(scheme, **d or {}) if scheme else d)
-        return self._attrsep.join(safe_str(self.attr(k, v)) for k, v in d.iteritems())
+        return self._attrsep.join(
+            safe_str(self.attr(k, v)) for k, v in d.iteritems()
+        )
 
     def head(self, **attrs):
         return self.FMT(self._head, id=self.id, type=self.type,
@@ -115,9 +117,6 @@ class GraphFormatter(object):
         )
 
 
-
-
-
 class CycleError(Exception):
     """A cycle was detected in an acyclic graph."""
 
@@ -152,6 +151,15 @@ class DependencyGraph(object):
         (``A`` depends on ``B``)."""
         self[A].append(B)
 
+    def find_last(self, g):
+        for obj in g.adjacent.keys():
+            if obj.last:
+                return obj
+
+    def connect(self, graph):
+        """Add nodes from another graph."""
+        self.adjacent.update(graph.adjacent)
+
     def topsort(self):
         """Sort the graph topologically.
 
@@ -273,7 +281,6 @@ class DependencyGraph(object):
 
         P(draw.head())
         for obj, adjacent in self.iteritems():
-            objl = draw.label(obj)
             if not adjacent:
                 if_not_seen(draw.terminal_node, obj)
             for req in adjacent:

+ 0 - 2
celery/tests/worker/test_worker.py

@@ -819,7 +819,6 @@ class test_WorkController(AppCase):
     def setup(self):
         self.worker = self.create_worker()
         from celery import worker
-        from celery.worker import components
         self._logger = worker.logger
         self._comp_logger = components.logger
         self.logger = worker.logger = Mock()
@@ -827,7 +826,6 @@ class test_WorkController(AppCase):
 
     def teardown(self):
         from celery import worker
-        from celery.worker import components
         worker.logger = self._logger
         components.logger = self._comp_logger
 

BIN
docs/images/worker_graph_full.png


+ 2 - 0
docs/userguide/extending.rst

@@ -13,6 +13,8 @@
 Bootsteps
 =========
 
+.. figure:: ../images/worker_graph_full.png
+
 .. _extending-worker-bootsteps:
 
 Worker Bootsteps