Browse Source

Utils: Moves DependencyGraph to new celery.utils.graph

Ask Solem 8 years ago
parent
commit
0e91fcde48

+ 1 - 1
celery/bin/graph.py

@@ -10,8 +10,8 @@ from __future__ import absolute_import, unicode_literals
 
 from operator import itemgetter
 
-from celery.datastructures import DependencyGraph, GraphFormatter
 from celery.five import items, python_2_unicode_compatible
+from celery.utils.graph import DependencyGraph, GraphFormatter
 
 from .base import Command
 

+ 1 - 1
celery/bootsteps.py

@@ -15,8 +15,8 @@ from kombu.common import ignore_errors
 from kombu.utils import symbol_by_name
 from kombu.utils.encoding import bytes_to_str
 
-from .datastructures import DependencyGraph, GraphFormatter
 from .five import bytes_if_py2, values, with_metaclass
+from .utils.graph import DependencyGraph, GraphFormatter
 from .utils.imports import instantiate, qualname
 from .utils.log import get_logger
 

+ 2 - 302
celery/datastructures.py

@@ -6,20 +6,19 @@
     Custom types and data-structures.
 
 """
-from __future__ import absolute_import, print_function, unicode_literals
+from __future__ import absolute_import, unicode_literals
 
 import sys
 import time
 
 from collections import (
     Callable, Mapping, MutableMapping, MutableSet, Sequence,
-    OrderedDict as _OrderedDict, defaultdict, deque,
+    OrderedDict as _OrderedDict, deque,
 )
 from heapq import heapify, heappush, heappop
 from itertools import chain, count
 
 from billiard.einfo import ExceptionInfo  # noqa
-from kombu.utils.encoding import safe_str, bytes_to_str
 from kombu.utils.limits import TokenBucket  # noqa
 
 from celery.five import Empty, items, keys, python_2_unicode_compatible, values
@@ -40,24 +39,12 @@ except ImportError:
     LazySettings = LazyObject  # noqa
 
 __all__ = [
-    'GraphFormatter', 'CycleError', 'DependencyGraph',
     'AttributeDictMixin', 'AttributeDict', 'DictAttribute',
     'ConfigurationView', 'LimitedSet',
 ]
 
 PY3 = sys.version_info[0] >= 3
 
-DOT_HEAD = """
-{IN}{type} {id} {{
-{INp}graph [{attrs}]
-"""
-DOT_ATTR = '{name}={value}'
-DOT_NODE = '{INp}"{0}" [{attrs}]'
-DOT_EDGE = '{INp}"{0}" {dir} "{1}" [{attrs}]'
-DOT_ATTRSEP = ', '
-DOT_DIRS = {'graph': '--', 'digraph': '->'}
-DOT_TAIL = '{IN}}}'
-
 REPR_LIMITED_SET = """\
 <{name}({size}): maxlen={0.maxlen}, expires={0.expires}, minlen={0.minlen}>\
 """
@@ -69,293 +56,6 @@ def force_mapping(m):
     return DictAttribute(m) if not isinstance(m, Mapping) else m
 
 
-class GraphFormatter(object):
-    _attr = DOT_ATTR.strip()
-    _node = DOT_NODE.strip()
-    _edge = DOT_EDGE.strip()
-    _head = DOT_HEAD.strip()
-    _tail = DOT_TAIL.strip()
-    _attrsep = DOT_ATTRSEP
-    _dirs = dict(DOT_DIRS)
-
-    scheme = {
-        'shape': 'box',
-        'arrowhead': 'vee',
-        'style': 'filled',
-        'fontname': 'HelveticaNeue',
-    }
-    edge_scheme = {
-        'color': 'darkseagreen4',
-        'arrowcolor': 'black',
-        'arrowsize': 0.7,
-    }
-    node_scheme = {'fillcolor': 'palegreen3', 'color': 'palegreen4'}
-    term_scheme = {'fillcolor': 'palegreen1', 'color': 'palegreen2'}
-    graph_scheme = {'bgcolor': 'mintcream'}
-
-    def __init__(self, root=None, type=None, id=None,
-                 indent=0, inw=' ' * 4, **scheme):
-        self.id = id or 'dependencies'
-        self.root = root
-        self.type = type or 'digraph'
-        self.direction = self._dirs[self.type]
-        self.IN = inw * (indent or 0)
-        self.INp = self.IN + inw
-        self.scheme = dict(self.scheme, **scheme)
-        self.graph_scheme = dict(self.graph_scheme, root=self.label(self.root))
-
-    def attr(self, name, value):
-        value = '"{0}"'.format(value)
-        return self.FMT(self._attr, name=name, value=value)
-
-    def attrs(self, d, scheme=None):
-        d = dict(self.scheme, **dict(scheme, **d or {}) if scheme else d)
-        return self._attrsep.join(
-            safe_str(self.attr(k, v)) for k, v in items(d)
-        )
-
-    def head(self, **attrs):
-        return self.FMT(
-            self._head, id=self.id, type=self.type,
-            attrs=self.attrs(attrs, self.graph_scheme),
-        )
-
-    def tail(self):
-        return self.FMT(self._tail)
-
-    def label(self, obj):
-        return obj
-
-    def node(self, obj, **attrs):
-        return self.draw_node(obj, self.node_scheme, attrs)
-
-    def terminal_node(self, obj, **attrs):
-        return self.draw_node(obj, self.term_scheme, attrs)
-
-    def edge(self, a, b, **attrs):
-        return self.draw_edge(a, b, **attrs)
-
-    def _enc(self, s):
-        return s.encode('utf-8', 'ignore')
-
-    def FMT(self, fmt, *args, **kwargs):
-        return self._enc(fmt.format(
-            *args, **dict(kwargs, IN=self.IN, INp=self.INp)
-        ))
-
-    def draw_edge(self, a, b, scheme=None, attrs=None):
-        return self.FMT(
-            self._edge, self.label(a), self.label(b),
-            dir=self.direction, attrs=self.attrs(attrs, self.edge_scheme),
-        )
-
-    def draw_node(self, obj, scheme=None, attrs=None):
-        return self.FMT(
-            self._node, self.label(obj), attrs=self.attrs(attrs, scheme),
-        )
-
-
-class CycleError(Exception):
-    """A cycle was detected in an acyclic graph."""
-
-
-@python_2_unicode_compatible
-class DependencyGraph(object):
-    """A directed acyclic graph of objects and their dependencies.
-
-    Supports a robust topological sort
-    to detect the order in which they must be handled.
-
-    Takes an optional iterator of ``(obj, dependencies)``
-    tuples to build the graph from.
-
-    .. warning::
-
-        Does not support cycle detection.
-
-    """
-
-    def __init__(self, it=None, formatter=None):
-        self.formatter = formatter or GraphFormatter()
-        self.adjacent = {}
-        if it is not None:
-            self.update(it)
-
-    def add_arc(self, obj):
-        """Add an object to the graph."""
-        self.adjacent.setdefault(obj, [])
-
-    def add_edge(self, A, B):
-        """Add an edge from object ``A`` to object ``B``
-        (``A`` depends on ``B``)."""
-        self[A].append(B)
-
-    def connect(self, graph):
-        """Add nodes from another graph."""
-        self.adjacent.update(graph.adjacent)
-
-    def topsort(self):
-        """Sort the graph topologically.
-
-        :returns: a list of objects in the order
-            in which they must be handled.
-
-        """
-        graph = DependencyGraph()
-        components = self._tarjan72()
-
-        NC = {
-            node: component for component in components for node in component
-        }
-        for component in components:
-            graph.add_arc(component)
-        for node in self:
-            node_c = NC[node]
-            for successor in self[node]:
-                successor_c = NC[successor]
-                if node_c != successor_c:
-                    graph.add_edge(node_c, successor_c)
-        return [t[0] for t in graph._khan62()]
-
-    def valency_of(self, obj):
-        """Return the valency (degree) of a vertex in the graph."""
-        try:
-            l = [len(self[obj])]
-        except KeyError:
-            return 0
-        for node in self[obj]:
-            l.append(self.valency_of(node))
-        return sum(l)
-
-    def update(self, it):
-        """Update the graph with data from a list
-        of ``(obj, dependencies)`` tuples."""
-        tups = list(it)
-        for obj, _ in tups:
-            self.add_arc(obj)
-        for obj, deps in tups:
-            for dep in deps:
-                self.add_edge(obj, dep)
-
-    def edges(self):
-        """Return generator that yields for all edges in the graph."""
-        return (obj for obj, adj in items(self) if adj)
-
-    def _khan62(self):
-        """Khans simple topological sort algorithm from '62
-
-        See https://en.wikipedia.org/wiki/Topological_sorting
-
-        """
-        count = defaultdict(lambda: 0)
-        result = []
-
-        for node in self:
-            for successor in self[node]:
-                count[successor] += 1
-        ready = [node for node in self if not count[node]]
-
-        while ready:
-            node = ready.pop()
-            result.append(node)
-
-            for successor in self[node]:
-                count[successor] -= 1
-                if count[successor] == 0:
-                    ready.append(successor)
-        result.reverse()
-        return result
-
-    def _tarjan72(self):
-        """Tarjan's algorithm to find strongly connected components.
-
-        See http://bit.ly/vIMv3h.
-
-        """
-        result, stack, low = [], [], {}
-
-        def visit(node):
-            if node in low:
-                return
-            num = len(low)
-            low[node] = num
-            stack_pos = len(stack)
-            stack.append(node)
-
-            for successor in self[node]:
-                visit(successor)
-                low[node] = min(low[node], low[successor])
-
-            if num == low[node]:
-                component = tuple(stack[stack_pos:])
-                stack[stack_pos:] = []
-                result.append(component)
-                for item in component:
-                    low[item] = len(self)
-
-        for node in self:
-            visit(node)
-
-        return result
-
-    def to_dot(self, fh, formatter=None):
-        """Convert the graph to DOT format.
-
-        :param fh: A file, or a file-like object to write the graph to.
-
-        """
-        seen = set()
-        draw = formatter or self.formatter
-
-        def P(s):
-            print(bytes_to_str(s), file=fh)
-
-        def if_not_seen(fun, obj):
-            if draw.label(obj) not in seen:
-                P(fun(obj))
-                seen.add(draw.label(obj))
-
-        P(draw.head())
-        for obj, adjacent in items(self):
-            if not adjacent:
-                if_not_seen(draw.terminal_node, obj)
-            for req in adjacent:
-                if_not_seen(draw.node, obj)
-                P(draw.edge(obj, req))
-        P(draw.tail())
-
-    def format(self, obj):
-        return self.formatter(obj) if self.formatter else obj
-
-    def __iter__(self):
-        return iter(self.adjacent)
-
-    def __getitem__(self, node):
-        return self.adjacent[node]
-
-    def __len__(self):
-        return len(self.adjacent)
-
-    def __contains__(self, obj):
-        return obj in self.adjacent
-
-    def _iterate_items(self):
-        return items(self.adjacent)
-    items = iteritems = _iterate_items
-
-    def __repr__(self):
-        return '\n'.join(self.repr_node(N) for N in self)
-
-    def repr_node(self, obj, level=1, fmt='{0}({1})'):
-        output = [fmt.format(obj, self.valency_of(obj))]
-        if obj in self:
-            for other in self[obj]:
-                d = fmt.format(other, self.valency_of(other))
-                output.append('     ' * level + d)
-                output.extend(self.repr_node(other, level + 1).split('\n')[1:])
-        return '\n'.join(output)
-
-
 class AttributeDictMixin(object):
     """Augment classes with a Mapping interface by adding attribute access.
 

+ 1 - 1
celery/result.py

@@ -21,12 +21,12 @@ from . import current_app
 from . import states
 from ._state import _set_task_join_will_block, task_join_will_block
 from .app import app_or_default
-from .datastructures import DependencyGraph, GraphFormatter
 from .exceptions import ImproperlyConfigured, IncompleteStream, TimeoutError
 from .five import (
     items, python_2_unicode_compatible, range, string_t, monotonic,
 )
 from .utils import deprecated
+from .utils.graph import DependencyGraph, GraphFormatter
 
 try:
     import tblib

+ 2 - 64
celery/tests/utils/test_datastructures.py

@@ -12,15 +12,14 @@ from celery.datastructures import (
     AttributeDict,
     BufferMap,
     ConfigurationView,
-    DependencyGraph,
     DictAttribute,
     LimitedSet,
     Messagebuffer,
 )
-from celery.five import WhateverIO, items
+from celery.five import items
 from celery.utils.objects import Bunch
 
-from celery.tests.case import Case, Mock, skip
+from celery.tests.case import Case, skip
 
 
 class test_DictAttribute(Case):
@@ -362,67 +361,6 @@ class test_AttributeDict(Case):
         self.assertEqual(x['bar'], 'foo')
 
 
-class test_DependencyGraph(Case):
-
-    def graph1(self):
-        return DependencyGraph([
-            ('A', []),
-            ('B', []),
-            ('C', ['A']),
-            ('D', ['C', 'B']),
-        ])
-
-    def test_repr(self):
-        self.assertTrue(repr(self.graph1()))
-
-    def test_topsort(self):
-        order = self.graph1().topsort()
-        # C must start before D
-        self.assertLess(order.index('C'), order.index('D'))
-        # and B must start before D
-        self.assertLess(order.index('B'), order.index('D'))
-        # and A must start before C
-        self.assertLess(order.index('A'), order.index('C'))
-
-    def test_edges(self):
-        self.assertItemsEqual(
-            list(self.graph1().edges()),
-            ['C', 'D'],
-        )
-
-    def test_connect(self):
-        x, y = self.graph1(), self.graph1()
-        x.connect(y)
-
-    def test_valency_of_when_missing(self):
-        x = self.graph1()
-        self.assertEqual(x.valency_of('foobarbaz'), 0)
-
-    def test_format(self):
-        x = self.graph1()
-        x.formatter = Mock()
-        obj = Mock()
-        self.assertTrue(x.format(obj))
-        x.formatter.assert_called_with(obj)
-        x.formatter = None
-        self.assertIs(x.format(obj), obj)
-
-    def test_items(self):
-        self.assertDictEqual(
-            dict(items(self.graph1())),
-            {'A': [], 'B': [], 'C': ['A'], 'D': ['C', 'B']},
-        )
-
-    def test_repr_node(self):
-        x = self.graph1()
-        self.assertTrue(x.repr_node('fasdswewqewq'))
-
-    def test_to_dot(self):
-        s = WhateverIO()
-        self.graph1().to_dot(s)
-        self.assertTrue(s.getvalue())
-
-
 class test_Messagebuffer(Case):
 
     def assert_size_and_first(self, buf, size, expected_first_item):

+ 67 - 0
celery/tests/utils/test_graph.py

@@ -0,0 +1,67 @@
+from __future__ import absolute_import, unicode_literals
+
+from celery.five import WhateverIO, items
+from celery.utils.graph import DependencyGraph
+
+from celery.tests.case import Case, Mock
+
+
+class test_DependencyGraph(Case):
+
+    def graph1(self):
+        return DependencyGraph([
+            ('A', []),
+            ('B', []),
+            ('C', ['A']),
+            ('D', ['C', 'B']),
+        ])
+
+    def test_repr(self):
+        self.assertTrue(repr(self.graph1()))
+
+    def test_topsort(self):
+        order = self.graph1().topsort()
+        # C must start before D
+        self.assertLess(order.index('C'), order.index('D'))
+        # and B must start before D
+        self.assertLess(order.index('B'), order.index('D'))
+        # and A must start before C
+        self.assertLess(order.index('A'), order.index('C'))
+
+    def test_edges(self):
+        self.assertItemsEqual(
+            list(self.graph1().edges()),
+            ['C', 'D'],
+        )
+
+    def test_connect(self):
+        x, y = self.graph1(), self.graph1()
+        x.connect(y)
+
+    def test_valency_of_when_missing(self):
+        x = self.graph1()
+        self.assertEqual(x.valency_of('foobarbaz'), 0)
+
+    def test_format(self):
+        x = self.graph1()
+        x.formatter = Mock()
+        obj = Mock()
+        self.assertTrue(x.format(obj))
+        x.formatter.assert_called_with(obj)
+        x.formatter = None
+        self.assertIs(x.format(obj), obj)
+
+    def test_items(self):
+        self.assertDictEqual(
+            dict(items(self.graph1())),
+            {'A': [], 'B': [], 'C': ['A'], 'D': ['C', 'B']},
+        )
+
+    def test_repr_node(self):
+        x = self.graph1()
+        self.assertTrue(x.repr_node('fasdswewqewq'))
+
+    def test_to_dot(self):
+        s = WhateverIO()
+        self.graph1().to_dot(s)
+        self.assertTrue(s.getvalue())

+ 318 - 0
celery/utils/graph.py

@@ -0,0 +1,318 @@
+# -*- coding: utf-8 -*-
+"""
+    ``celery.utils.graph``
+    ~~~~~~~~~~~~~~~~~~~~~~
+
+    Dependency graph implementation.
+
+"""
+from __future__ import absolute_import, print_function, unicode_literals
+
+from collections import defaultdict
+from textwrap import dedent
+
+from kombu.utils.encoding import safe_str, bytes_to_str
+
+from celery.five import items, python_2_unicode_compatible
+
+__all__ = ['DOT', 'CycleError', 'DependencyGraph', 'GraphFormatter']
+
+
+class DOT:
+    HEAD = dedent("""
+        {IN}{type} {id} {{
+        {INp}graph [{attrs}]
+    """)
+    ATTR = '{name}={value}'
+    NODE = '{INp}"{0}" [{attrs}]'
+    EDGE = '{INp}"{0}" {dir} "{1}" [{attrs}]'
+    ATTRSEP = ', '
+    DIRS = {'graph': '--', 'digraph': '->'}
+    TAIL = '{IN}}}'
+
+
+class CycleError(Exception):
+    """A cycle was detected in an acyclic graph."""
+
+
+@python_2_unicode_compatible
+class DependencyGraph(object):
+    """A directed acyclic graph of objects and their dependencies.
+
+    Supports a robust topological sort
+    to detect the order in which they must be handled.
+
+    Takes an optional iterator of ``(obj, dependencies)``
+    tuples to build the graph from.
+
+    .. warning::
+
+        Does not support cycle detection.
+
+    """
+
+    def __init__(self, it=None, formatter=None):
+        self.formatter = formatter or GraphFormatter()
+        self.adjacent = {}
+        if it is not None:
+            self.update(it)
+
+    def add_arc(self, obj):
+        """Add an object to the graph."""
+        self.adjacent.setdefault(obj, [])
+
+    def add_edge(self, A, B):
+        """Add an edge from object ``A`` to object ``B``
+        (``A`` depends on ``B``)."""
+        self[A].append(B)
+
+    def connect(self, graph):
+        """Add nodes from another graph."""
+        self.adjacent.update(graph.adjacent)
+
+    def topsort(self):
+        """Sort the graph topologically.
+
+        :returns: a list of objects in the order
+            in which they must be handled.
+
+        """
+        graph = DependencyGraph()
+        components = self._tarjan72()
+
+        NC = {
+            node: component for component in components for node in component
+        }
+        for component in components:
+            graph.add_arc(component)
+        for node in self:
+            node_c = NC[node]
+            for successor in self[node]:
+                successor_c = NC[successor]
+                if node_c != successor_c:
+                    graph.add_edge(node_c, successor_c)
+        return [t[0] for t in graph._khan62()]
+
+    def valency_of(self, obj):
+        """Return the valency (degree) of a vertex in the graph."""
+        try:
+            l = [len(self[obj])]
+        except KeyError:
+            return 0
+        for node in self[obj]:
+            l.append(self.valency_of(node))
+        return sum(l)
+
+    def update(self, it):
+        """Update the graph with data from a list
+        of ``(obj, dependencies)`` tuples."""
+        tups = list(it)
+        for obj, _ in tups:
+            self.add_arc(obj)
+        for obj, deps in tups:
+            for dep in deps:
+                self.add_edge(obj, dep)
+
+    def edges(self):
+        """Return generator that yields for all edges in the graph."""
+        return (obj for obj, adj in items(self) if adj)
+
+    def _khan62(self):
+        """Khans simple topological sort algorithm from '62
+
+        See https://en.wikipedia.org/wiki/Topological_sorting
+
+        """
+        count = defaultdict(lambda: 0)
+        result = []
+
+        for node in self:
+            for successor in self[node]:
+                count[successor] += 1
+        ready = [node for node in self if not count[node]]
+
+        while ready:
+            node = ready.pop()
+            result.append(node)
+
+            for successor in self[node]:
+                count[successor] -= 1
+                if count[successor] == 0:
+                    ready.append(successor)
+        result.reverse()
+        return result
+
+    def _tarjan72(self):
+        """Tarjan's algorithm to find strongly connected components.
+
+        See http://bit.ly/vIMv3h.
+
+        """
+        result, stack, low = [], [], {}
+
+        def visit(node):
+            if node in low:
+                return
+            num = len(low)
+            low[node] = num
+            stack_pos = len(stack)
+            stack.append(node)
+
+            for successor in self[node]:
+                visit(successor)
+                low[node] = min(low[node], low[successor])
+
+            if num == low[node]:
+                component = tuple(stack[stack_pos:])
+                stack[stack_pos:] = []
+                result.append(component)
+                for item in component:
+                    low[item] = len(self)
+
+        for node in self:
+            visit(node)
+
+        return result
+
+    def to_dot(self, fh, formatter=None):
+        """Convert the graph to DOT format.
+
+        :param fh: A file, or a file-like object to write the graph to.
+
+        """
+        seen = set()
+        draw = formatter or self.formatter
+
+        def P(s):
+            print(bytes_to_str(s), file=fh)
+
+        def if_not_seen(fun, obj):
+            if draw.label(obj) not in seen:
+                P(fun(obj))
+                seen.add(draw.label(obj))
+
+        P(draw.head())
+        for obj, adjacent in items(self):
+            if not adjacent:
+                if_not_seen(draw.terminal_node, obj)
+            for req in adjacent:
+                if_not_seen(draw.node, obj)
+                P(draw.edge(obj, req))
+        P(draw.tail())
+
+    def format(self, obj):
+        return self.formatter(obj) if self.formatter else obj
+
+    def __iter__(self):
+        return iter(self.adjacent)
+
+    def __getitem__(self, node):
+        return self.adjacent[node]
+
+    def __len__(self):
+        return len(self.adjacent)
+
+    def __contains__(self, obj):
+        return obj in self.adjacent
+
+    def _iterate_items(self):
+        return items(self.adjacent)
+    items = iteritems = _iterate_items
+
+    def __repr__(self):
+        return '\n'.join(self.repr_node(N) for N in self)
+
+    def repr_node(self, obj, level=1, fmt='{0}({1})'):
+        output = [fmt.format(obj, self.valency_of(obj))]
+        if obj in self:
+            for other in self[obj]:
+                d = fmt.format(other, self.valency_of(other))
+                output.append('     ' * level + d)
+                output.extend(self.repr_node(other, level + 1).split('\n')[1:])
+        return '\n'.join(output)
+
+
+class GraphFormatter(object):
+    _attr = DOT.ATTR.strip()
+    _node = DOT.NODE.strip()
+    _edge = DOT.EDGE.strip()
+    _head = DOT.HEAD.strip()
+    _tail = DOT.TAIL.strip()
+    _attrsep = DOT.ATTRSEP
+    _dirs = dict(DOT.DIRS)
+
+    scheme = {
+        'shape': 'box',
+        'arrowhead': 'vee',
+        'style': 'filled',
+        'fontname': 'HelveticaNeue',
+    }
+    edge_scheme = {
+        'color': 'darkseagreen4',
+        'arrowcolor': 'black',
+        'arrowsize': 0.7,
+    }
+    node_scheme = {'fillcolor': 'palegreen3', 'color': 'palegreen4'}
+    term_scheme = {'fillcolor': 'palegreen1', 'color': 'palegreen2'}
+    graph_scheme = {'bgcolor': 'mintcream'}
+
+    def __init__(self, root=None, type=None, id=None,
+                 indent=0, inw=' ' * 4, **scheme):
+        self.id = id or 'dependencies'
+        self.root = root
+        self.type = type or 'digraph'
+        self.direction = self._dirs[self.type]
+        self.IN = inw * (indent or 0)
+        self.INp = self.IN + inw
+        self.scheme = dict(self.scheme, **scheme)
+        self.graph_scheme = dict(self.graph_scheme, root=self.label(self.root))
+
+    def attr(self, name, value):
+        value = '"{0}"'.format(value)
+        return self.FMT(self._attr, name=name, value=value)
+
+    def attrs(self, d, scheme=None):
+        d = dict(self.scheme, **dict(scheme, **d or {}) if scheme else d)
+        return self._attrsep.join(
+            safe_str(self.attr(k, v)) for k, v in items(d)
+        )
+
+    def head(self, **attrs):
+        return self.FMT(
+            self._head, id=self.id, type=self.type,
+            attrs=self.attrs(attrs, self.graph_scheme),
+        )
+
+    def tail(self):
+        return self.FMT(self._tail)
+
+    def label(self, obj):
+        return obj
+
+    def node(self, obj, **attrs):
+        return self.draw_node(obj, self.node_scheme, attrs)
+
+    def terminal_node(self, obj, **attrs):
+        return self.draw_node(obj, self.term_scheme, attrs)
+
+    def edge(self, a, b, **attrs):
+        return self.draw_edge(a, b, **attrs)
+
+    def _enc(self, s):
+        return s.encode('utf-8', 'ignore')
+
+    def FMT(self, fmt, *args, **kwargs):
+        return self._enc(fmt.format(
+            *args, **dict(kwargs, IN=self.IN, INp=self.INp)
+        ))
+
+    def draw_edge(self, a, b, scheme=None, attrs=None):
+        return self.FMT(
+            self._edge, self.label(a), self.label(b),
+            dir=self.direction, attrs=self.attrs(attrs, self.edge_scheme),
+        )
+
+    def draw_node(self, obj, scheme=None, attrs=None):
+        return self.FMT(
+            self._node, self.label(obj), attrs=self.attrs(attrs, scheme),
+        )

+ 3 - 3
docs/history/whatsnew-3.0.rst

@@ -293,15 +293,15 @@ Tasks can now have callbacks and errbacks, and dependencies are recorded
 
        - AsyncResult.graph
 
-            A ``DependencyGraph`` of the tasks dependencies.
-            This can also be used to convert to dot format:
+            A :class:`~celery.utils.graph.DependencyGraph` of the tasks
+            dependencies.  With this you can also convert to dot format:
 
             .. code-block:: python
 
                 with open('graph.dot') as fh:
                     result.graph.to_dot(fh)
 
-            which can than be used to produce an image:
+            then produce an image of the graph:
 
             .. code-block:: console
 

+ 11 - 0
docs/internals/reference/celery.utils.graph.rst

@@ -0,0 +1,11 @@
+==========================================
+ ``celery.utils.graph``
+==========================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.utils.graph
+
+.. automodule:: celery.utils.graph
+    :members:
+    :undoc-members:

+ 1 - 0
docs/internals/reference/index.rst

@@ -53,6 +53,7 @@
     celery.utils
     celery.utils.abstract
     celery.utils.functional
+    celery.utils.graph
     celery.utils.objects
     celery.utils.term
     celery.utils.timeutils

+ 1 - 1
docs/userguide/canvas.rst

@@ -607,7 +607,7 @@ Graphs
 ~~~~~~
 
 In addition you can work with the result graph as a
-:class:`~celery.datastructures.DependencyGraph`:
+:class:`~celery.utils.graph.DependencyGraph`:
 
 .. code-block:: pycon