Explorar o código

Added "celeryctl status" + "celeryctl inspect ping"

Ask Solem %!s(int64=14) %!d(string=hai) anos
pai
achega
4f0ef66f8a
Modificáronse 2 ficheiros con 31 adicións e 13 borrados
  1. 28 13
      celery/bin/celeryctl.py
  2. 3 0
      celery/task/control.py

+ 28 - 13
celery/bin/celeryctl.py

@@ -14,6 +14,10 @@ from celery.utils import term as t
 commands = {}
 
 
+class Error(Exception):
+    pass
+
+
 def command(fun):
     commands[fun.__name__] = fun
     return fun
@@ -48,9 +52,11 @@ class Command(object):
         self(*args, **options.__dict__)
 
     def __call__(self, *args, **kwargs):
-        out = self.run(*args, **kwargs)
-        if out:
-            sys.stderr.write("%s\n" % out)
+        try:
+            self.run(*args, **kwargs)
+        except Error, exc:
+            sys.stderr.write(t.colored(t.red(
+                                t.bold("Error: %s\n" % (exc, )))))
 
     def run(self, *args, **kwargs):
         raise NotImplementedError()
@@ -117,7 +123,8 @@ class inspect(Command):
                "registered_tasks": 1.0,
                "enable_events": 1.0,
                "disable_events": 1.0,
-               "diagnose": 2.0}
+               "diagnose": 2.0,
+               "ping": 0.2}
     option_list = Command.option_list + (
                 Option("--quiet", "-q", action="store_true", dest="quiet",
                        default=False),
@@ -134,10 +141,10 @@ class inspect(Command):
     def run(self, *args, **kwargs):
         self.quiet = kwargs.get("quiet", False)
         if not args:
-            return "Missing inspect command. See --help"
+            raise Error("Missing inspect command. See --help")
         command = args[0]
         if command not in self.choices:
-            return "Unknown inspect command: %s" % command
+            raise Error("Unknown inspect command: %s" % command)
         from celery.task.control import inspect
 
         destination = kwargs.get("destination")
@@ -149,15 +156,12 @@ class inspect(Command):
         i = inspect(destination=destination, timeout=timeout)
         replies = getattr(i, command)()
         if not replies:
-            return t.colored(t.red(t.bold("Error: "),
-                            "No nodes replied within time constraint."))
+            raise Error("No nodes replied within time constraint.")
         for node, reply in replies.items():
             status, preply = prettify(reply)
             self.say("->", t.cyan(node, ": ") + t.reset() + status,
                      indent(preply))
-            #print(t.colored(t.bold(t.white("-> "), t.cyan(node, ": "),
-            #                t.reset(), status)))
-            #print(indent(preply))
+        return replies
 
     def say(self, direction, title, body=""):
         if direction == "<-" and self.quiet:
@@ -166,7 +170,7 @@ class inspect(Command):
         print(t.colored(dirstr, title))
         if body and not self.quiet:
             print(body)
-
+inspect = command(inspect)
 
 def prettify_list(n):
     if not n:
@@ -201,7 +205,18 @@ def indent(s, n=4):
     return "\n".join("\n".join(wrap(j)) for j in i)
 
 
-inspect = command(inspect)
+class status(Command):
+
+    def run(self, *args, **kwargs):
+        replies = inspect().run("ping", quiet=True)
+        if not replies:
+            raise Error("No nodes replied within time constraint")
+        nodecount = len(replies)
+        print("\n%s %s online." % (nodecount,
+                                   nodecount > 1 and "nodes" or "node"))
+status = command(status)
+
+
 
 
 class help(Command):

+ 3 - 0
celery/task/control.py

@@ -143,6 +143,9 @@ class inspect(object):
         diagnose_timeout = self.timeout * 0.85 # 15% of timeout
         return self._request("diagnose", timeout=diagnose_timeout)
 
+    def ping(self):
+        return self._request("ping")
+
 
 @with_connection
 def broadcast(command, arguments=None, destination=None, connection=None,