camqadm.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. #!/usr/bin/env python
  2. """camqadm
  3. .. program:: camqadm
  4. .. cmdoption:: -X, --x
  5. Description
  6. """
  7. import os
  8. import cmd
  9. import sys
  10. import shlex
  11. import pprint
  12. import readline
  13. import optparse
  14. from itertools import count
  15. from amqplib import client_0_8 as amqp
  16. from carrot.utils import partition
  17. from celery.utils import info
  18. from celery.utils import padlist
  19. from celery.messaging import establish_connection
  20. # Valid string -> bool coercions.
  21. BOOLS = {"1": True, "0": False,
  22. "on": True, "off": False,
  23. "yes": True, "no": False,
  24. "true": True, "False": False}
  25. # Map to coerce strings to other types.
  26. COERCE = {bool: lambda value: BOOLS[value.lower()]}
  27. OPTION_LIST = (
  28. #optparse.make_option('-c', '--concurrency',
  29. # default=conf.CELERYD_CONCURRENCY,
  30. # action="store", dest="concurrency", type="int",
  31. # help="Number of child processes processing the queue."),
  32. )
  33. HELP_HEADER = """
  34. Commands
  35. --------
  36. """.rstrip()
  37. EXAMPLE_TEXT = """
  38. Example:
  39. -> queue.delete myqueue yes no
  40. """
  41. def say(m):
  42. sys.stderr.write("%s\n" % (m, ))
  43. class Spec(object):
  44. """AMQP Command specification.
  45. Used to convert arguments to Python values and display various help
  46. and tooltips.
  47. :param args: see :attr:`args`.
  48. :keyword returns: see :attr:`returns`.
  49. .. attribute args::
  50. List of arguments this command takes. Should
  51. contain ``(argument_name, argument_type)`` tuples.
  52. .. attribute returns:
  53. Helpful human string representation of what this command returns.
  54. May be ``None``, to signify the return type is unknown.
  55. """
  56. def __init__(self, *args, **kwargs):
  57. self.args = args
  58. self.returns = kwargs.get("returns")
  59. def coerce(self, index, value):
  60. """Coerce value for argument at index.
  61. E.g. if :attr:`args` is ``[("is_active", bool)]``:
  62. >>> coerce(0, "False")
  63. False
  64. """
  65. arg_name, arg_type = self.args[index]
  66. # Might be a custom way to coerce the string value,
  67. # so look in the coercion map.
  68. return COERCE.get(arg_type, arg_type)(value)
  69. def str_args_to_python(self, arglist):
  70. """Process list of string arguments to values according to spec.
  71. e.g:
  72. >>> spec = Spec([("queue", str), ("if_unused", bool)])
  73. >>> spec.str_args_to_python("pobox", "true")
  74. ("pobox", True)
  75. """
  76. return tuple(self.coerce(index, value)
  77. for index, value in enumerate(arglist))
  78. def format_response(self, response):
  79. """Format the return value of this command in a human-friendly way."""
  80. if not self.returns:
  81. if response is None:
  82. return "ok."
  83. return response
  84. if callable(self.returns):
  85. return self.returns(response)
  86. return self.returns % (response, )
  87. def format_arg(self, name, type, default_value=None):
  88. if default_value is not None:
  89. return "%s:%s" % (name, default_value)
  90. return name
  91. def format_signature(self):
  92. return " ".join(self.format_arg(*padlist(list(arg), 3))
  93. for arg in self.args)
  94. def dump_message(message):
  95. if message is None:
  96. return "No messages in queue. basic.publish something."
  97. return {"body": message.body,
  98. "properties": message.properties,
  99. "delivery_info": message.delivery_info}
  100. def format_declare_queue(ret):
  101. return "ok. queue:%s messages:%s consumers:%s." % ret
  102. class AMQShell(cmd.Cmd):
  103. """AMQP API Shell.
  104. :keyword connect: Function used to connect to the server, must return
  105. connection object.
  106. :keyword silent: If ``True``, the commands won't have annoying output not
  107. relevant when running in non-shell mode.
  108. .. attribute: builtins
  109. Mapping of built-in command names -> method names
  110. .. attribute:: amqp
  111. Mapping of AMQP API commands and their :class:`Spec`.
  112. """
  113. conn = None
  114. chan = None
  115. prompt_fmt = "%d> "
  116. identchars = cmd.IDENTCHARS = "."
  117. needs_reconnect = False
  118. counter = 1
  119. inc_counter = count(2).next
  120. builtins = {"exit": "do_exit",
  121. "EOF": "do_exit",
  122. "help": "do_help",}
  123. amqp = {
  124. "exchange.declare": Spec(("exchange", str),
  125. ("type", str),
  126. ("passive", bool, "no"),
  127. ("durable", bool, "no"),
  128. ("auto_delete", bool, "no"),
  129. ("internal", bool, "no")),
  130. "exchange.delete": Spec(("exchange", str),
  131. ("if_unused", bool)),
  132. "queue.bind": Spec(("queue", str),
  133. ("exchange", str),
  134. ("routing_key", str)),
  135. "queue.declare": Spec(("queue", str),
  136. ("passive", bool, "no"),
  137. ("durable", bool, "no"),
  138. ("exclusive", bool, "no"),
  139. ("auto_delete", bool, "no"),
  140. returns=format_declare_queue),
  141. "queue.delete": Spec(("queue", str),
  142. ("if_unused", bool, "no"),
  143. ("if_empty", bool, "no"),
  144. returns="ok. %d messages deleted."),
  145. "queue.purge": Spec(("queue", str),
  146. returns="ok. %d messages deleted."),
  147. "basic.get": Spec(("queue", str),
  148. ("no_ack", bool, "off"),
  149. returns=dump_message),
  150. "basic.publish": Spec(("msg", amqp.Message),
  151. ("exchange", str),
  152. ("routing_key", str),
  153. ("mandatory", bool, "no"),
  154. ("immediate", bool, "no")),
  155. "basic.ack": Spec(("delivery_tag", int)),
  156. }
  157. def __init__(self, *args, **kwargs):
  158. self.connect = kwargs.pop("connect")
  159. self.silent = kwargs.pop("silent", False)
  160. cmd.Cmd.__init__(self, *args, **kwargs)
  161. self._reconnect()
  162. def say(self, m):
  163. """Say something to the user. Disabled if :attr:`silent``."""
  164. if not self.silent:
  165. say(m)
  166. def get_amqp_api_command(self, cmd, arglist):
  167. """With a command name and a list of arguments, convert the arguments
  168. to Python values and find the corresponding method on the AMQP channel
  169. object.
  170. :returns: tuple of ``(method, processed_args)``.
  171. Example:
  172. >>> get_amqp_api_command("queue.delete", ["pobox", "yes", "no"])
  173. (<bound method Channel.queue_delete of
  174. <amqplib.client_0_8.channel.Channel object at 0x...>>,
  175. ('testfoo', True, False))
  176. """
  177. spec = self.amqp[cmd]
  178. args = spec.str_args_to_python(arglist)
  179. attr_name = cmd.replace(".", "_")
  180. if self.needs_reconnect:
  181. self._reconnect()
  182. return getattr(self.chan, attr_name), args, spec.format_response
  183. def do_exit(self, *args):
  184. """The ``"exit"`` command."""
  185. self.say("\n-> please, don't leave!")
  186. sys.exit(0)
  187. def display_command_help(self, cmd, short=False):
  188. spec = self.amqp[cmd]
  189. say("%s %s" % (cmd, spec.format_signature()))
  190. def do_help(self, *args):
  191. if not args:
  192. say(HELP_HEADER)
  193. for cmd_name in self.amqp.keys():
  194. self.display_command_help(cmd_name, short=True)
  195. say(EXAMPLE_TEXT)
  196. else:
  197. self.display_command_help(args[0])
  198. def default(self, line):
  199. say("unknown syntax: '%s'. how about some 'help'?" % line)
  200. def get_names(self):
  201. return set(self.builtins.keys() + self.amqp.keys())
  202. def completenames(self, text, *ignored):
  203. """Return all commands starting with ``text``, for tab-completion."""
  204. names = self.get_names()
  205. first = [cmd for cmd in names
  206. if cmd.startswith(text.replace("_", "."))]
  207. if first:
  208. return first
  209. return [cmd for cmd in names
  210. if partition(cmd, ".")[2].startswith(text)]
  211. def dispatch(self, cmd, argline):
  212. """Dispatch and execute the command.
  213. Lookup order is: :attr:`builtins` -> :attr:`amqp`.
  214. """
  215. arglist = shlex.split(argline)
  216. if cmd in self.builtins:
  217. return getattr(self, self.builtins[cmd])(*arglist)
  218. fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
  219. return formatter(fun(*args))
  220. def parseline(self, line):
  221. """Parse input line.
  222. :returns: tuple of three items:
  223. ``(command_name, arglist, original_line)``
  224. E.g::
  225. >>> parseline("queue.delete A 'B' C")
  226. ("queue.delete", "A 'B' C", "queue.delete A 'B' C")
  227. """
  228. parts = line.split()
  229. if parts:
  230. return parts[0], " ".join(parts[1:]), line
  231. return "", "", line
  232. def onecmd(self, line):
  233. """Parse line and execute command."""
  234. cmd, arg, line = self.parseline(line)
  235. if not line:
  236. return self.emptyline()
  237. if cmd is None:
  238. return self.default(line)
  239. self.lastcmd = line
  240. if cmd == '':
  241. return self.default(line)
  242. else:
  243. self.counter = self.inc_counter()
  244. try:
  245. self.respond(self.dispatch(cmd, arg))
  246. except (AttributeError, KeyError), exc:
  247. self.default(line)
  248. except Exception, exc:
  249. say(exc)
  250. self.needs_reconnect = True
  251. def respond(self, retval):
  252. """What to do with the return value of a command."""
  253. if retval is not None:
  254. if isinstance(retval, basestring):
  255. say(retval)
  256. else:
  257. pprint.pprint(retval)
  258. def _reconnect(self):
  259. """Re-establish connection to the AMQP server."""
  260. self.conn = self.connect(self.conn)
  261. self.chan = self.conn.create_backend().channel
  262. self.needs_reconnect = False
  263. @property
  264. def prompt(self):
  265. return self.prompt_fmt % self.counter
  266. class AMQPAdmin(object):
  267. """The celery ``camqadm`` utility."""
  268. def __init__(self, *args, **kwargs):
  269. self.silent = bool(args)
  270. if "silent" in kwargs:
  271. self.silent = kwargs["silent"]
  272. self.args = args
  273. def connect(self, conn=None):
  274. if conn:
  275. conn.close()
  276. self.say("-> connecting to %s." % info.format_broker_info())
  277. conn = establish_connection()
  278. conn.connect()
  279. self.say("-> connected.")
  280. return conn
  281. def run(self):
  282. shell = AMQShell(connect=self.connect)
  283. if self.args:
  284. return shell.onecmd(" ".join(self.args))
  285. return shell.cmdloop()
  286. def say(self, m):
  287. if not self.silent:
  288. say(m)
  289. def parse_options(arguments):
  290. """Parse the available options to ``celeryd``."""
  291. parser = optparse.OptionParser(option_list=OPTION_LIST)
  292. options, values = parser.parse_args(arguments)
  293. return options, values
  294. def camqadm(*args, **options):
  295. return AMQPAdmin(*args, **options).run()
  296. def main():
  297. options, values = parse_options(sys.argv[1:])
  298. return run_worker(*values, **vars(options))
  299. if __name__ == "__main__":
  300. main()