|
@@ -10,6 +10,7 @@ from __future__ import absolute_import, print_function
|
|
|
|
|
|
import anyjson
|
|
|
import heapq
|
|
|
+import os
|
|
|
import sys
|
|
|
import warnings
|
|
|
|
|
@@ -26,6 +27,12 @@ from celery.utils.timeutils import maybe_iso8601
|
|
|
|
|
|
from celery.bin.base import Command as BaseCommand, Option
|
|
|
|
|
|
+try:
|
|
|
+ # print_statement does not work with io.StringIO
|
|
|
+ from io import BytesIO as PrintIO
|
|
|
+except ImportError:
|
|
|
+ from StringIO import StringIO as PrintIO
|
|
|
+
|
|
|
HELP = """
|
|
|
---- -- - - ---- Commands- -------------- --- ------------
|
|
|
|
|
@@ -40,13 +47,18 @@ Migrating task {state.count}/{state.strtotal}: \
|
|
|
{body[task]}[{body[id]}]\
|
|
|
"""
|
|
|
|
|
|
-commands = {}
|
|
|
+DEBUG = os.environ.get('C_DEBUG', False)
|
|
|
|
|
|
+commands = {}
|
|
|
command_classes = [
|
|
|
('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
|
|
|
('Remote Control', ['status', 'inspect', 'control'], 'blue'),
|
|
|
('Utils', ['purge', 'list', 'migrate', 'call', 'result', 'report'], None),
|
|
|
]
|
|
|
+if DEBUG:
|
|
|
+ command_classes.append(
|
|
|
+ ('Debug', ['worker_graph', 'consumer_graph'], 'red'),
|
|
|
+ )
|
|
|
|
|
|
|
|
|
@memoize()
|
|
@@ -457,6 +469,26 @@ class result(Command):
|
|
|
self.out(self.prettify(value)[1])
|
|
|
|
|
|
|
|
|
+@command
|
|
|
+class worker_graph(Command):
|
|
|
+
|
|
|
+ def run(self, **kwargs):
|
|
|
+ worker = self.app.WorkController()
|
|
|
+ out = PrintIO()
|
|
|
+ worker.namespace.graph.to_dot(out)
|
|
|
+ self.out(out.getvalue())
|
|
|
+
|
|
|
+
|
|
|
+@command
|
|
|
+class consumer_graph(Command):
|
|
|
+
|
|
|
+ def run(self, **kwargs):
|
|
|
+ worker = self.app.WorkController()
|
|
|
+ out = PrintIO()
|
|
|
+ worker.consumer.namespace.graph.to_dot(out)
|
|
|
+ self.out(out.getvalue())
|
|
|
+
|
|
|
+
|
|
|
class _RemoteControl(Command):
|
|
|
name = None
|
|
|
choices = None
|