Browse Source

celery/bin/camqadm.py documented

Ask Solem 15 years ago
parent
commit
fdf5f44ff5
1 changed files with 122 additions and 17 deletions
  1. 122 17
      celery/bin/camqadm.py

+ 122 - 17
celery/bin/camqadm.py

@@ -16,14 +16,19 @@ import shlex
 import pprint
 import pprint
 import readline
 import readline
 import optparse
 import optparse
+from itertools import starmap
 
 
 from celery.utils import info
 from celery.utils import info
 from celery.messaging import establish_connection
 from celery.messaging import establish_connection
 
 
 
 
-def say(m):
-    sys.stderr.write("%s\n" % (m, ))
+# Valid string -> bool coercions.
+BOOLS = {"1": True, "0": False,
+         "yes": True, "no": False,
+         "true": True, "False": False}
 
 
+# Map to coerce strings to other types.
+COERCE = {bool: lambda value: BOOLS[value.lower()]}
 
 
 OPTION_LIST = (
 OPTION_LIST = (
     #optparse.make_option('-c', '--concurrency',
     #optparse.make_option('-c', '--concurrency',
@@ -32,15 +37,82 @@ OPTION_LIST = (
     #        help="Number of child processes processing the queue."),
     #        help="Number of child processes processing the queue."),
 )
 )
 
 
+
+
+def say(m):
+    sys.stderr.write("%s\n" % (m, ))
+
+
 class Spec(object):
 class Spec(object):
+    """AMQP Command specification.
+
+    Used to convert arguments to Python values and display various help
+    and tooltips.
+
+    :param args: see :attr:`args`.
+    :keyword returns: see :attr:`returns`.
+
+    .. attribute args::
+
+        List of arguments this command takes. Should
+        contain ``(argument_name, argument_type)`` tuples.
+
+    .. attribute returns:
 
 
-    def __init__(self, *arglist, **kwargs):
-        self.arglist = arglist
+        Helpful human string representation of what this command returns.
+        May be ``None``, to signify the return type is unknown.
+
+    """
+    def __init__(self, *args, **kwargs):
+        self.args = args
         self.returns = kwargs.get("returns")
         self.returns = kwargs.get("returns")
 
 
+    def coerce(self, index, value):
+        """Coerce value for argument at index.
+
+        E.g. if :attr:`args` is ``[("is_active", bool)]``:
+
+            >>> coerce(0, "False")
+            False
+
+        """
+        arg_name, arg_type = self.args[index]
+        # Might be a custom way to coerce the string value,
+        # so look in the coercion map.
+        return COERCE.get(arg_type, arg_type)(value)
+
+    def str_args_to_python(self, arglist):
+        """Process list of string arguments to values according to spec.
+
+        e.g:
+
+            >>> spec = Spec([("queue", str), ("if_unused", bool)])
+            >>> spec.str_args_to_python("pobox", "true")
+            ("pobox", True)
+
+        """
+        return tuple(starmap(self.coerce, enumerate(arglist)))
+
 
 
 class AMQShell(cmd.Cmd):
 class AMQShell(cmd.Cmd):
+    """AMQP API Shell.
+
+    :keyword connect: Function used to connect to the server, must return
+        connection object.
 
 
+    :keyword silent: If ``True``, the commands won't have annoying output not
+        relevant when running in non-shell mode.
+
+
+    .. attribute: builtins
+
+        Mapping of built-in command names -> method names
+
+    .. attribute:: amqp
+
+        Mapping of AMQP API commands and their :class:`Spec`.
+
+    """
     conn = None
     conn = None
     chan = None
     chan = None
     prompt = "--> "
     prompt = "--> "
@@ -48,8 +120,7 @@ class AMQShell(cmd.Cmd):
     needs_reconnect = False
     needs_reconnect = False
 
 
 
 
-    builtins = {"exit": "do_exit",
-                "EOF": "do_exit"}
+    builtins = {"exit": "do_exit", "EOF": "do_exit"}
 
 
     amqp = {
     amqp = {
         "exchange.declare": Spec(("exchange", str),
         "exchange.declare": Spec(("exchange", str),
@@ -77,7 +148,6 @@ class AMQShell(cmd.Cmd):
                           ("no_ack", bool)),
                           ("no_ack", bool)),
     }
     }
 
 
-
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         self.connect = kwargs.pop("connect")
         self.connect = kwargs.pop("connect")
         self.silent = kwargs.pop("silent", False)
         self.silent = kwargs.pop("silent", False)
@@ -85,45 +155,72 @@ class AMQShell(cmd.Cmd):
         self._reconnect()
         self._reconnect()
 
 
     def say(self, m):
     def say(self, m):
+        """Say something to the user. Disabled if :attr:`silent``."""
         if not self.silent:
         if not self.silent:
             say(m)
             say(m)
 
 
-    def _reconnect(self):
-        self.conn = self.connect(self.conn)
-        self.chan = self.conn.create_backend().channel
-        self.needs_reconnect = False
+    def get_amqp_api_command(self, cmd, arglist):
+        """With a command name and a list of arguments, convert the arguments
+        to Python values and find the corresponding method on the AMQP channel
+        object.
+
+        :returns: tuple of ``(method, processed_args)``.
 
 
-    def _apply_spec(self, arglist, spec):
-        return arglist
+        Example:
 
 
-    def _get_amqp_api_command(self, cmd, arglist):
+            >>> get_amqp_api_command("queue.delete", ["pobox", "yes", "no"])
+            (<bound method Channel.queue_delete of
+             <amqplib.client_0_8.channel.Channel object at 0x...>>,
+             ('testfoo', True, False))
+
+        """
         spec = self.amqp[cmd]
         spec = self.amqp[cmd]
+        args = spec.str_args_to_python(arglist)
         attr_name = cmd.replace(".", "_")
         attr_name = cmd.replace(".", "_")
         if self.needs_reconnect:
         if self.needs_reconnect:
             self._reconnect()
             self._reconnect()
-        return (getattr(self.chan, attr_name),
-                self._apply_spec(arglist, spec))
+        return (getattr(self.chan, attr_name), args)
 
 
     def do_exit(self, *args):
     def do_exit(self, *args):
+        """The ``"exit"`` command."""
         self.say("\n-> please, don't leave!")
         self.say("\n-> please, don't leave!")
         sys.exit(0)
         sys.exit(0)
 
 
     def completenames(self, text, *ignored):
     def completenames(self, text, *ignored):
+        """Return all commands starting with ``text``, for tab-completion."""
         return [cmd for cmd in set(self.builtins.keys() + self.amqp.keys())
         return [cmd for cmd in set(self.builtins.keys() + self.amqp.keys())
                         if cmd.startswith(text.replace(".", "_"))]
                         if cmd.startswith(text.replace(".", "_"))]
 
 
     def dispatch(self, cmd, argline):
     def dispatch(self, cmd, argline):
+        """Dispatch and execute the command.
+
+        Lookup order is: :attr:`builtins` -> :attr:`amqp`.
+
+        """
         arglist = shlex.split(argline)
         arglist = shlex.split(argline)
         if cmd in self.builtins:
         if cmd in self.builtins:
             return getattr(self, self.builtins[cmd])(*arglist)
             return getattr(self, self.builtins[cmd])(*arglist)
-        fun, args = self._get_amqp_api_command(cmd, arglist)
+        fun, args = self.get_amqp_api_command(cmd, arglist)
+        print((fun, args))
         return fun(*args)
         return fun(*args)
 
 
     def parseline(self, line):
     def parseline(self, line):
+        """Parse input line.
+
+        :returns: tuple of three items:
+            ``(command_name, arglist, original_line)``
+
+        E.g::
+
+            >>> parseline("queue.delete A 'B' C")
+            ("queue.delete", "A 'B' C", "queue.delete A 'B' C")
+
+        """
         parts = line.split()
         parts = line.split()
         return parts[0], " ".join(parts[1:]), line
         return parts[0], " ".join(parts[1:]), line
 
 
     def onecmd(self, line):
     def onecmd(self, line):
+        """Parse line and execute command."""
         cmd, arg, line = self.parseline(line)
         cmd, arg, line = self.parseline(line)
         if not line:
         if not line:
             return self.emptyline()
             return self.emptyline()
@@ -142,10 +239,18 @@ class AMQShell(cmd.Cmd):
                 self.needs_reconnect = True
                 self.needs_reconnect = True
 
 
     def respond(self, retval):
     def respond(self, retval):
+        """What to do with the return value of a command."""
         pprint.pprint(retval)
         pprint.pprint(retval)
 
 
+    def _reconnect(self):
+        """Re-establish connection to the AMQP server."""
+        self.conn = self.connect(self.conn)
+        self.chan = self.conn.create_backend().channel
+        self.needs_reconnect = False
+
 
 
 class AMQPAdmin(object):
 class AMQPAdmin(object):
+    """The celery ``camqadm`` utility."""
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         self.silent = bool(args)
         self.silent = bool(args)