camqadm.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # -*- coding: utf-8 -*-
  2. """camqadm
  3. .. program:: camqadm
  4. """
  5. from __future__ import absolute_import
  6. if __name__ == "__main__" and __package__ is None:
  7. __package__ = "celery.bin.celeryctl"
  8. import cmd
  9. import sys
  10. import shlex
  11. import pprint
  12. from itertools import count
  13. from amqplib import client_0_8 as amqp
  14. from ..app import app_or_default
  15. from ..utils import padlist
  16. from .base import Command
  17. # Valid string -> bool coercions.
  18. BOOLS = {"1": True, "0": False,
  19. "on": True, "off": False,
  20. "yes": True, "no": False,
  21. "true": True, "False": False}
  22. # Map to coerce strings to other types.
  23. COERCE = {bool: lambda value: BOOLS[value.lower()]}
  24. HELP_HEADER = """
  25. Commands
  26. --------
  27. """.rstrip()
  28. EXAMPLE_TEXT = """
  29. Example:
  30. -> queue.delete myqueue yes no
  31. """
  32. def say(m):
  33. sys.stderr.write("%s\n" % (m, ))
  34. class Spec(object):
  35. """AMQP Command specification.
  36. Used to convert arguments to Python values and display various help
  37. and tooltips.
  38. :param args: see :attr:`args`.
  39. :keyword returns: see :attr:`returns`.
  40. .. attribute args::
  41. List of arguments this command takes. Should
  42. contain `(argument_name, argument_type)` tuples.
  43. .. attribute returns:
  44. Helpful human string representation of what this command returns.
  45. May be :const:`None`, to signify the return type is unknown.
  46. """
  47. def __init__(self, *args, **kwargs):
  48. self.args = args
  49. self.returns = kwargs.get("returns")
  50. def coerce(self, index, value):
  51. """Coerce value for argument at index.
  52. E.g. if :attr:`args` is `[("is_active", bool)]`:
  53. >>> coerce(0, "False")
  54. False
  55. """
  56. arg_info = self.args[index]
  57. arg_type = arg_info[1]
  58. # Might be a custom way to coerce the string value,
  59. # so look in the coercion map.
  60. return COERCE.get(arg_type, arg_type)(value)
  61. def str_args_to_python(self, arglist):
  62. """Process list of string arguments to values according to spec.
  63. e.g:
  64. >>> spec = Spec([("queue", str), ("if_unused", bool)])
  65. >>> spec.str_args_to_python("pobox", "true")
  66. ("pobox", True)
  67. """
  68. return tuple(self.coerce(index, value)
  69. for index, value in enumerate(arglist))
  70. def format_response(self, response):
  71. """Format the return value of this command in a human-friendly way."""
  72. if not self.returns:
  73. if response is None:
  74. return "ok."
  75. return response
  76. if callable(self.returns):
  77. return self.returns(response)
  78. return self.returns % (response, )
  79. def format_arg(self, name, type, default_value=None):
  80. if default_value is not None:
  81. return "%s:%s" % (name, default_value)
  82. return name
  83. def format_signature(self):
  84. return " ".join(self.format_arg(*padlist(list(arg), 3))
  85. for arg in self.args)
  86. def dump_message(message):
  87. if message is None:
  88. return "No messages in queue. basic.publish something."
  89. return {"body": message.body,
  90. "properties": message.properties,
  91. "delivery_info": message.delivery_info}
  92. def format_declare_queue(ret):
  93. return "ok. queue:%s messages:%s consumers:%s." % ret
  94. class AMQShell(cmd.Cmd):
  95. """AMQP API Shell.
  96. :keyword connect: Function used to connect to the server, must return
  97. connection object.
  98. :keyword silent: If :const:`True`, the commands won't have annoying
  99. output not relevant when running in non-shell mode.
  100. .. attribute: builtins
  101. Mapping of built-in command names -> method names
  102. .. attribute:: amqp
  103. Mapping of AMQP API commands and their :class:`Spec`.
  104. """
  105. conn = None
  106. chan = None
  107. prompt_fmt = "%d> "
  108. identchars = cmd.IDENTCHARS = "."
  109. needs_reconnect = False
  110. counter = 1
  111. inc_counter = count(2).next
  112. builtins = {"EOF": "do_exit",
  113. "exit": "do_exit",
  114. "help": "do_help"}
  115. amqp = {
  116. "exchange.declare": Spec(("exchange", str),
  117. ("type", str),
  118. ("passive", bool, "no"),
  119. ("durable", bool, "no"),
  120. ("auto_delete", bool, "no"),
  121. ("internal", bool, "no")),
  122. "exchange.delete": Spec(("exchange", str),
  123. ("if_unused", bool)),
  124. "queue.bind": Spec(("queue", str),
  125. ("exchange", str),
  126. ("routing_key", str)),
  127. "queue.declare": Spec(("queue", str),
  128. ("passive", bool, "no"),
  129. ("durable", bool, "no"),
  130. ("exclusive", bool, "no"),
  131. ("auto_delete", bool, "no"),
  132. returns=format_declare_queue),
  133. "queue.delete": Spec(("queue", str),
  134. ("if_unused", bool, "no"),
  135. ("if_empty", bool, "no"),
  136. returns="ok. %d messages deleted."),
  137. "queue.purge": Spec(("queue", str),
  138. returns="ok. %d messages deleted."),
  139. "basic.get": Spec(("queue", str),
  140. ("no_ack", bool, "off"),
  141. returns=dump_message),
  142. "basic.publish": Spec(("msg", amqp.Message),
  143. ("exchange", str),
  144. ("routing_key", str),
  145. ("mandatory", bool, "no"),
  146. ("immediate", bool, "no")),
  147. "basic.ack": Spec(("delivery_tag", int)),
  148. }
  149. def __init__(self, *args, **kwargs):
  150. self.connect = kwargs.pop("connect")
  151. self.silent = kwargs.pop("silent", False)
  152. cmd.Cmd.__init__(self, *args, **kwargs)
  153. self._reconnect()
  154. def say(self, m):
  155. """Say something to the user. Disabled if :attr:`silent`."""
  156. if not self.silent:
  157. say(m)
  158. def get_amqp_api_command(self, cmd, arglist):
  159. """With a command name and a list of arguments, convert the arguments
  160. to Python values and find the corresponding method on the AMQP channel
  161. object.
  162. :returns: tuple of `(method, processed_args)`.
  163. Example:
  164. >>> get_amqp_api_command("queue.delete", ["pobox", "yes", "no"])
  165. (<bound method Channel.queue_delete of
  166. <amqplib.client_0_8.channel.Channel object at 0x...>>,
  167. ('testfoo', True, False))
  168. """
  169. spec = self.amqp[cmd]
  170. args = spec.str_args_to_python(arglist)
  171. attr_name = cmd.replace(".", "_")
  172. if self.needs_reconnect:
  173. self._reconnect()
  174. return getattr(self.chan, attr_name), args, spec.format_response
  175. def do_exit(self, *args):
  176. """The `"exit"` command."""
  177. self.say("\n-> please, don't leave!")
  178. sys.exit(0)
  179. def display_command_help(self, cmd, short=False):
  180. spec = self.amqp[cmd]
  181. say("%s %s" % (cmd, spec.format_signature()))
  182. def do_help(self, *args):
  183. if not args:
  184. say(HELP_HEADER)
  185. for cmd_name in self.amqp.keys():
  186. self.display_command_help(cmd_name, short=True)
  187. say(EXAMPLE_TEXT)
  188. else:
  189. self.display_command_help(args[0])
  190. def default(self, line):
  191. say("unknown syntax: '%s'. how about some 'help'?" % line)
  192. def get_names(self):
  193. return set(self.builtins) | set(self.amqp)
  194. def completenames(self, text, *ignored):
  195. """Return all commands starting with `text`, for tab-completion."""
  196. names = self.get_names()
  197. first = [cmd for cmd in names
  198. if cmd.startswith(text.replace("_", "."))]
  199. if first:
  200. return first
  201. return [cmd for cmd in names
  202. if cmd.partition(".")[2].startswith(text)]
  203. def dispatch(self, cmd, argline):
  204. """Dispatch and execute the command.
  205. Lookup order is: :attr:`builtins` -> :attr:`amqp`.
  206. """
  207. arglist = shlex.split(argline)
  208. if cmd in self.builtins:
  209. return getattr(self, self.builtins[cmd])(*arglist)
  210. fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
  211. return formatter(fun(*args))
  212. def parseline(self, line):
  213. """Parse input line.
  214. :returns: tuple of three items:
  215. `(command_name, arglist, original_line)`
  216. E.g::
  217. >>> parseline("queue.delete A 'B' C")
  218. ("queue.delete", "A 'B' C", "queue.delete A 'B' C")
  219. """
  220. parts = line.split()
  221. if parts:
  222. return parts[0], " ".join(parts[1:]), line
  223. return "", "", line
  224. def onecmd(self, line):
  225. """Parse line and execute command."""
  226. cmd, arg, line = self.parseline(line)
  227. if not line:
  228. return self.emptyline()
  229. if cmd is None:
  230. return self.default(line)
  231. self.lastcmd = line
  232. if cmd == '':
  233. return self.default(line)
  234. else:
  235. self.counter = self.inc_counter()
  236. try:
  237. self.respond(self.dispatch(cmd, arg))
  238. except (AttributeError, KeyError), exc:
  239. self.default(line)
  240. except Exception, exc:
  241. say(exc)
  242. self.needs_reconnect = True
  243. def respond(self, retval):
  244. """What to do with the return value of a command."""
  245. if retval is not None:
  246. if isinstance(retval, basestring):
  247. say(retval)
  248. else:
  249. pprint.pprint(retval)
  250. def _reconnect(self):
  251. """Re-establish connection to the AMQP server."""
  252. self.conn = self.connect(self.conn)
  253. self.chan = self.conn.channel()
  254. self.needs_reconnect = False
  255. @property
  256. def prompt(self):
  257. return self.prompt_fmt % self.counter
  258. class AMQPAdmin(object):
  259. """The celery :program:`camqadm` utility."""
  260. def __init__(self, *args, **kwargs):
  261. self.app = app_or_default(kwargs.get("app"))
  262. self.silent = bool(args)
  263. if "silent" in kwargs:
  264. self.silent = kwargs["silent"]
  265. self.args = args
  266. def connect(self, conn=None):
  267. if conn:
  268. conn.close()
  269. conn = self.app.broker_connection()
  270. self.say("-> connecting to %s." % conn.as_uri())
  271. conn.connect()
  272. self.say("-> connected.")
  273. return conn
  274. def run(self):
  275. shell = AMQShell(connect=self.connect)
  276. if self.args:
  277. return shell.onecmd(" ".join(self.args))
  278. try:
  279. return shell.cmdloop()
  280. except KeyboardInterrupt:
  281. self.say("(bibi)")
  282. pass
  283. def say(self, m):
  284. if not self.silent:
  285. say(m)
  286. class AMQPAdminCommand(Command):
  287. def run(self, *args, **options):
  288. options["app"] = self.app
  289. return AMQPAdmin(*args, **options).run()
  290. def camqadm(*args, **options):
  291. AMQPAdmin(*args, **options).run()
  292. def main():
  293. AMQPAdminCommand().execute_from_commandline()
  294. if __name__ == "__main__": # pragma: no cover
  295. main()