Browse Source

Adds celery rate_limit command

Ask Solem 13 years ago
parent
commit
bce0dcc764
1 changed files with 31 additions and 12 deletions
  1. 31 12
      celery/bin/celery.py

+ 31 - 12
celery/bin/celery.py

@@ -51,6 +51,7 @@ class Command(BaseCommand):
     args = ""
     args = ""
     version = __version__
     version = __version__
     prog_name = "celery"
     prog_name = "celery"
+    show_body = True
 
 
     option_list = (
     option_list = (
         Option("--quiet", "-q", action="store_true"),
         Option("--quiet", "-q", action="store_true"),
@@ -94,6 +95,8 @@ class Command(BaseCommand):
         options, args = self.prepare_args(
         options, args = self.prepare_args(
                 *self.parser.parse_args(self.arglist))
                 *self.parser.parse_args(self.arglist))
         self.colored = term.colored(enabled=not options["no_color"])
         self.colored = term.colored(enabled=not options["no_color"])
+        self.quiet = options.get("quiet", False)
+        self.show_body = options.get("show_body", True)
         return self(*args, **options)
         return self(*args, **options)
 
 
     def usage(self, command):
     def usage(self, command):
@@ -116,6 +119,13 @@ class Command(BaseCommand):
         return (c.red("ERROR"),
         return (c.red("ERROR"),
                 indent(self.prettify(n["error"])[1]))
                 indent(self.prettify(n["error"])[1]))
 
 
+    def say_remote_command_reply(self, replies):
+        c = self.colored
+        node = replies.keys()[0]
+        reply = replies[node]
+        status, preply = self.prettify(reply)
+        self.say_chat("->", c.cyan(node, ": ") + status, indent(preply))
+
     def prettify(self, n):
     def prettify(self, n):
         OK = str(self.colored.green("OK"))
         OK = str(self.colored.green("OK"))
         if isinstance(n, list):
         if isinstance(n, list):
@@ -127,6 +137,15 @@ class Command(BaseCommand):
             return OK, unicode(n)
             return OK, unicode(n)
         return OK, pformat(n)
         return OK, pformat(n)
 
 
+    def say_chat(self, direction, title, body=""):
+        c = self.colored
+        if direction == "<-" and self.quiet:
+            return
+        dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""
+        self.out(c.reset(dirstr, title))
+        if body and self.show_body:
+            self.out(body)
+
 
 
 class Delegate(Command):
 class Delegate(Command):
 
 
@@ -269,6 +288,16 @@ class result(Command):
 result = command(result)
 result = command(result)
 
 
 
 
+class rate_limit(Command):
+    args = "<task_name> <rate_limit (e.g. 10/m for 10 in a second)>"
+
+    def run(self, task_name, limit, **kwargs):
+        self.say_chat("<-", "rate_limit")
+        replies = self.app.control.rate_limit(task_name, limit, reply=True)
+        for reply in replies:
+            self.say_remote_command_reply(reply)
+rate_limit = command(rate_limit)
+
 class inspect(Command):
 class inspect(Command):
     choices = {"active": 1.0,
     choices = {"active": 1.0,
                "active_queues": 1.0,
                "active_queues": 1.0,
@@ -289,15 +318,12 @@ class inspect(Command):
                     help="Timeout in seconds (float) waiting for reply"),
                     help="Timeout in seconds (float) waiting for reply"),
                 Option("--destination", "-d",
                 Option("--destination", "-d",
                     help="Comma separated list of destination node names."))
                     help="Comma separated list of destination node names."))
-    show_body = True
 
 
     def usage(self, command):
     def usage(self, command):
         return "%%prog %s [options] %s [%s]" % (
         return "%%prog %s [options] %s [%s]" % (
                 command, self.args, "|".join(self.choices.keys()))
                 command, self.args, "|".join(self.choices.keys()))
 
 
     def run(self, *args, **kwargs):
     def run(self, *args, **kwargs):
-        self.quiet = kwargs.get("quiet", False)
-        self.show_body = kwargs.get("show_body", True)
         if not args:
         if not args:
             raise Error("Missing inspect command. See --help")
             raise Error("Missing inspect command. See --help")
         command = args[0]
         command = args[0]
@@ -311,17 +337,10 @@ class inspect(Command):
         if destination and isinstance(destination, basestring):
         if destination and isinstance(destination, basestring):
             destination = map(str.strip, destination.split(","))
             destination = map(str.strip, destination.split(","))
 
 
-        def on_reply(body):
-            c = self.colored
-            node = body.keys()[0]
-            reply = body[node]
-            status, preply = self.prettify(reply)
-            self.say("->", c.cyan(node, ": ") + status, indent(preply))
-
-        self.say("<-", command)
+        self.say_chat("<-", command)
         i = self.app.control.inspect(destination=destination,
         i = self.app.control.inspect(destination=destination,
                                      timeout=timeout,
                                      timeout=timeout,
-                                     callback=on_reply)
+                                     callback=self.say_remote_command_reply)
         replies = getattr(i, command)(*args[1:])
         replies = getattr(i, command)(*args[1:])
         if not replies:
         if not replies:
             raise Error("No nodes replied within time constraint.",
             raise Error("No nodes replied within time constraint.",