Kaynağa Gözat

camqadm: New functions: help, basic.publish and basic.ack

Ask Solem 15 yıl önce
ebeveyn
işleme
b26360a32b
1 değiştirilmiş dosya ile 83 ekleme ve 19 silme
  1. 83 19
      celery/bin/camqadm.py

+ 83 - 19
celery/bin/camqadm.py

@@ -17,12 +17,16 @@ import pprint
 import readline
 import optparse
 
+from amqplib import client_0_8 as amqp
+
 from celery.utils import info
+from celery.utils import padlist
 from celery.messaging import establish_connection
 
 
 # Valid string -> bool coercions.
 BOOLS = {"1": True, "0": False,
+         "on": True, "off": False,
          "yes": True, "no": False,
          "true": True, "False": False}
 
@@ -36,7 +40,15 @@ OPTION_LIST = (
     #        help="Number of child processes processing the queue."),
 )
 
+HELP_HEADER = """
+Commands
+--------
+""".rstrip()
 
+EXAMPLE_TEXT = """
+Example:
+    -> queue.delete myqueue yes no
+"""
 
 def say(m):
     sys.stderr.write("%s\n" % (m, ))
@@ -93,6 +105,33 @@ class Spec(object):
         return tuple(self.coerce(index, value)
                 for index, value in enumerate(arglist))
 
+    def format_response(self, response):
+        """Format the return value of this command in a human-friendly way."""
+        if not self.returns:
+            if response is None:
+                return "ok."
+            return response
+        if callable(self.returns):
+            return self.returns(response)
+        return self.returns % (response, )
+
+    def format_arg(self, name, type, default_value=None):
+        if default_value is not None:
+            return "%s:%s" % (name, default_value)
+        return name
+
+    def format_signature(self):
+        return " ".join(self.format_arg(*padlist(list(arg), 3))
+                            for arg in self.args)
+
+
+def dump_message(message):
+    if message is None:
+        return "No messages in queue. basic.publish something."
+    return {"body": message.body,
+            "properties": message.properties,
+            "delivery_info": message.delivery_info}
+
 
 class AMQShell(cmd.Cmd):
     """AMQP API Shell.
@@ -120,32 +159,41 @@ class AMQShell(cmd.Cmd):
     needs_reconnect = False
 
 
-    builtins = {"exit": "do_exit", "EOF": "do_exit"}
+    builtins = {"exit": "do_exit",
+                "EOF": "do_exit",
+                "help": "do_help",}
 
     amqp = {
         "exchange.declare": Spec(("exchange", str),
                                  ("type", str),
-                                 ("passive", bool),
-                                 ("durable", bool),
-                                 ("auto_delete", bool),
-                                 ("internal", bool)),
+                                 ("passive", bool, "no"),
+                                 ("durable", bool, "no"),
+                                 ("auto_delete", bool, "no"),
+                                 ("internal", bool, "no")),
         "exchange.delete": Spec(("exchange", str),
                                 ("if_unused", bool)),
         "queue.bind": Spec(("queue", str),
                            ("exchange", str),
                            ("routing_key", str)),
         "queue.declare": Spec(("queue", str),
-                              ("passive", bool),
-                              ("durable", bool),
-                              ("exclusive", bool),
-                              ("auto_delete", bool),
-                              returns="Messages purged"),
+                              ("passive", bool, "no"),
+                              ("durable", bool, "no"),
+                              ("exclusive", bool, "no"),
+                              ("auto_delete", bool, "no"),
+                              returns="%d messages deleted"),
         "queue.delete": Spec(("queue", str),
-                             ("if_unused", bool),
-                             ("if_empty", bool)),
-        "queue.purge": Spec(("queue", str), returns="Messages purged"),
+                             ("if_unused", bool, "no"),
+                             ("if_empty", bool, "no")),
+        "queue.purge": Spec(("queue", str), returns="%d messages deleted"),
         "basic.get": Spec(("queue", str),
-                          ("no_ack", bool)),
+                          ("no_ack", bool, "off"),
+                          returns=dump_message),
+        "basic.publish": Spec(("msg", amqp.Message),
+                              ("exchange", str),
+                              ("routing_key", str),
+                              ("mandatory", bool, "no"),
+                              ("immediate", bool, "no")),
+        "basic.ack": Spec(("delivery_tag", int)),
     }
 
     def __init__(self, *args, **kwargs):
@@ -179,13 +227,26 @@ class AMQShell(cmd.Cmd):
         attr_name = cmd.replace(".", "_")
         if self.needs_reconnect:
             self._reconnect()
-        return (getattr(self.chan, attr_name), args)
+        return getattr(self.chan, attr_name), args, spec.format_response
 
     def do_exit(self, *args):
         """The ``"exit"`` command."""
         self.say("\n-> please, don't leave!")
         sys.exit(0)
 
+    def display_command_help(self, cmd, short=False):
+        spec = self.amqp[cmd]
+        say("%s %s" % (cmd, spec.format_signature()))
+
+    def do_help(self, *args):
+        if not args:
+            say(HELP_HEADER)
+            for cmd_name in self.amqp.keys():
+                self.display_command_help(cmd_name, short=True)
+            say(EXAMPLE_TEXT)
+        else:
+            self.display_command_help(args[0])
+
     def completenames(self, text, *ignored):
         """Return all commands starting with ``text``, for tab-completion."""
         return [cmd for cmd in set(self.builtins.keys() + self.amqp.keys())
@@ -200,9 +261,8 @@ class AMQShell(cmd.Cmd):
         arglist = shlex.split(argline)
         if cmd in self.builtins:
             return getattr(self, self.builtins[cmd])(*arglist)
-        fun, args = self.get_amqp_api_command(cmd, arglist)
-        print((fun, args))
-        return fun(*args)
+        fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
+        return formatter(fun(*args))
 
     def parseline(self, line):
         """Parse input line.
@@ -240,7 +300,11 @@ class AMQShell(cmd.Cmd):
 
     def respond(self, retval):
         """What to do with the return value of a command."""
-        pprint.pprint(retval)
+        if retval is not None:
+            if isinstance(retval, basestring):
+                say(retval)
+            else:
+                pprint.pprint(retval)
 
     def _reconnect(self):
         """Re-establish connection to the AMQP server."""