amqp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery amqp` command.
  4. .. program:: celery amqp
  5. """
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import cmd
  8. import sys
  9. import shlex
  10. import pprint
  11. from functools import partial
  12. from itertools import count
  13. from kombu.utils.encoding import safe_str
  14. from celery.utils.functional import padlist
  15. from celery.bin.base import Command
  16. from celery.five import string_t
  17. from celery.utils import strtobool
  18. __all__ = ['AMQPAdmin', 'AMQShell', 'Spec', 'amqp']
  19. # Map to coerce strings to other types.
  20. COERCE = {bool: strtobool}
  21. HELP_HEADER = """
  22. Commands
  23. --------
  24. """.rstrip()
  25. EXAMPLE_TEXT = """
  26. Example:
  27. -> queue.delete myqueue yes no
  28. """
  29. say = partial(print, file=sys.stderr)
  30. class Spec(object):
  31. """AMQP Command specification.
  32. Used to convert arguments to Python values and display various help
  33. and tooltips.
  34. :param args: see :attr:`args`.
  35. :keyword returns: see :attr:`returns`.
  36. .. attribute args::
  37. List of arguments this command takes. Should
  38. contain `(argument_name, argument_type)` tuples.
  39. .. attribute returns:
  40. Helpful human string representation of what this command returns.
  41. May be :const:`None`, to signify the return type is unknown.
  42. """
  43. def __init__(self, *args, **kwargs):
  44. self.args = args
  45. self.returns = kwargs.get('returns')
  46. def coerce(self, index, value):
  47. """Coerce value for argument at index."""
  48. arg_info = self.args[index]
  49. arg_type = arg_info[1]
  50. # Might be a custom way to coerce the string value,
  51. # so look in the coercion map.
  52. return COERCE.get(arg_type, arg_type)(value)
  53. def str_args_to_python(self, arglist):
  54. """Process list of string arguments to values according to spec.
  55. e.g:
  56. >>> spec = Spec([('queue', str), ('if_unused', bool)])
  57. >>> spec.str_args_to_python('pobox', 'true')
  58. ('pobox', True)
  59. """
  60. return tuple(
  61. self.coerce(index, value) for index, value in enumerate(arglist))
  62. def format_response(self, response):
  63. """Format the return value of this command in a human-friendly way."""
  64. if not self.returns:
  65. return 'ok.' if response is None else response
  66. if callable(self.returns):
  67. return self.returns(response)
  68. return self.returns.format(response)
  69. def format_arg(self, name, type, default_value=None):
  70. if default_value is not None:
  71. return '{0}:{1}'.format(name, default_value)
  72. return name
  73. def format_signature(self):
  74. return ' '.join(self.format_arg(*padlist(list(arg), 3))
  75. for arg in self.args)
  76. def dump_message(message):
  77. if message is None:
  78. return 'No messages in queue. basic.publish something.'
  79. return {'body': message.body,
  80. 'properties': message.properties,
  81. 'delivery_info': message.delivery_info}
  82. def format_declare_queue(ret):
  83. return 'ok. queue:{0} messages:{1} consumers:{2}.'.format(*ret)
  84. class AMQShell(cmd.Cmd):
  85. """AMQP API Shell.
  86. :keyword connect: Function used to connect to the server, must return
  87. connection object.
  88. :keyword silent: If :const:`True`, the commands won't have annoying
  89. output not relevant when running in non-shell mode.
  90. .. attribute: builtins
  91. Mapping of built-in command names -> method names
  92. .. attribute:: amqp
  93. Mapping of AMQP API commands and their :class:`Spec`.
  94. """
  95. conn = None
  96. chan = None
  97. prompt_fmt = '{self.counter}> '
  98. identchars = cmd.IDENTCHARS = '.'
  99. needs_reconnect = False
  100. counter = 1
  101. inc_counter = count(2)
  102. builtins = {'EOF': 'do_exit',
  103. 'exit': 'do_exit',
  104. 'help': 'do_help'}
  105. amqp = {
  106. 'exchange.declare': Spec(('exchange', str),
  107. ('type', str),
  108. ('passive', bool, 'no'),
  109. ('durable', bool, 'no'),
  110. ('auto_delete', bool, 'no'),
  111. ('internal', bool, 'no')),
  112. 'exchange.delete': Spec(('exchange', str),
  113. ('if_unused', bool)),
  114. 'queue.bind': Spec(('queue', str),
  115. ('exchange', str),
  116. ('routing_key', str)),
  117. 'queue.declare': Spec(('queue', str),
  118. ('passive', bool, 'no'),
  119. ('durable', bool, 'no'),
  120. ('exclusive', bool, 'no'),
  121. ('auto_delete', bool, 'no'),
  122. returns=format_declare_queue),
  123. 'queue.delete': Spec(('queue', str),
  124. ('if_unused', bool, 'no'),
  125. ('if_empty', bool, 'no'),
  126. returns='ok. {0} messages deleted.'),
  127. 'queue.purge': Spec(('queue', str),
  128. returns='ok. {0} messages deleted.'),
  129. 'basic.get': Spec(('queue', str),
  130. ('no_ack', bool, 'off'),
  131. returns=dump_message),
  132. 'basic.publish': Spec(('msg', str),
  133. ('exchange', str),
  134. ('routing_key', str),
  135. ('mandatory', bool, 'no'),
  136. ('immediate', bool, 'no')),
  137. 'basic.ack': Spec(('delivery_tag', int)),
  138. }
  139. def __init__(self, *args, **kwargs):
  140. self.connect = kwargs.pop('connect')
  141. self.silent = kwargs.pop('silent', False)
  142. self.out = kwargs.pop('out', sys.stderr)
  143. cmd.Cmd.__init__(self, *args, **kwargs)
  144. self._reconnect()
  145. def note(self, m):
  146. """Say something to the user. Disabled if :attr:`silent`."""
  147. if not self.silent:
  148. say(m, file=self.out)
  149. def say(self, m):
  150. say(m, file=self.out)
  151. def get_amqp_api_command(self, cmd, arglist):
  152. """With a command name and a list of arguments, convert the arguments
  153. to Python values and find the corresponding method on the AMQP channel
  154. object.
  155. :returns: tuple of `(method, processed_args)`.
  156. """
  157. spec = self.amqp[cmd]
  158. args = spec.str_args_to_python(arglist)
  159. attr_name = cmd.replace('.', '_')
  160. if self.needs_reconnect:
  161. self._reconnect()
  162. return getattr(self.chan, attr_name), args, spec.format_response
  163. def do_exit(self, *args):
  164. """The `'exit'` command."""
  165. self.note("\n-> please, don't leave!")
  166. sys.exit(0)
  167. def display_command_help(self, cmd, short=False):
  168. spec = self.amqp[cmd]
  169. self.say('{0} {1}'.format(cmd, spec.format_signature()))
  170. def do_help(self, *args):
  171. if not args:
  172. self.say(HELP_HEADER)
  173. for cmd_name in self.amqp:
  174. self.display_command_help(cmd_name, short=True)
  175. self.say(EXAMPLE_TEXT)
  176. else:
  177. self.display_command_help(args[0])
  178. def default(self, line):
  179. self.say("unknown syntax: {0!r}. how about some 'help'?".format(line))
  180. def get_names(self):
  181. return set(self.builtins) | set(self.amqp)
  182. def completenames(self, text, *ignored):
  183. """Return all commands starting with `text`, for tab-completion."""
  184. names = self.get_names()
  185. first = [cmd for cmd in names
  186. if cmd.startswith(text.replace('_', '.'))]
  187. if first:
  188. return first
  189. return [cmd for cmd in names
  190. if cmd.partition('.')[2].startswith(text)]
  191. def dispatch(self, cmd, argline):
  192. """Dispatch and execute the command.
  193. Lookup order is: :attr:`builtins` -> :attr:`amqp`.
  194. """
  195. arglist = shlex.split(safe_str(argline))
  196. if cmd in self.builtins:
  197. return getattr(self, self.builtins[cmd])(*arglist)
  198. fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
  199. return formatter(fun(*args))
  200. def parseline(self, line):
  201. """Parse input line.
  202. :returns: tuple of three items:
  203. `(command_name, arglist, original_line)`
  204. """
  205. parts = line.split()
  206. if parts:
  207. return parts[0], ' '.join(parts[1:]), line
  208. return '', '', line
  209. def onecmd(self, line):
  210. """Parse line and execute command."""
  211. cmd, arg, line = self.parseline(line)
  212. if not line:
  213. return self.emptyline()
  214. self.lastcmd = line
  215. self.counter = next(self.inc_counter)
  216. try:
  217. self.respond(self.dispatch(cmd, arg))
  218. except (AttributeError, KeyError) as exc:
  219. self.default(line)
  220. except Exception as exc:
  221. self.say(exc)
  222. self.needs_reconnect = True
  223. def respond(self, retval):
  224. """What to do with the return value of a command."""
  225. if retval is not None:
  226. if isinstance(retval, string_t):
  227. self.say(retval)
  228. else:
  229. self.say(pprint.pformat(retval))
  230. def _reconnect(self):
  231. """Re-establish connection to the AMQP server."""
  232. self.conn = self.connect(self.conn)
  233. self.chan = self.conn.default_channel
  234. self.needs_reconnect = False
  235. @property
  236. def prompt(self):
  237. return self.prompt_fmt.format(self=self)
  238. class AMQPAdmin(object):
  239. """The celery :program:`celery amqp` utility."""
  240. Shell = AMQShell
  241. def __init__(self, *args, **kwargs):
  242. self.app = kwargs['app']
  243. self.out = kwargs.setdefault('out', sys.stderr)
  244. self.silent = kwargs.get('silent')
  245. self.args = args
  246. def connect(self, conn=None):
  247. if conn:
  248. conn.close()
  249. conn = self.app.connection()
  250. self.note('-> connecting to {0}.'.format(conn.as_uri()))
  251. conn.connect()
  252. self.note('-> connected.')
  253. return conn
  254. def run(self):
  255. shell = self.Shell(connect=self.connect, out=self.out)
  256. if self.args:
  257. return shell.onecmd(' '.join(self.args))
  258. try:
  259. return shell.cmdloop()
  260. except KeyboardInterrupt:
  261. self.note('(bibi)')
  262. pass
  263. def note(self, m):
  264. if not self.silent:
  265. say(m, file=self.out)
  266. class amqp(Command):
  267. """AMQP Administration Shell.
  268. Also works for non-amqp transports (but not ones that
  269. store declarations in memory).
  270. Examples::
  271. celery amqp
  272. start shell mode
  273. celery amqp help
  274. show list of commands
  275. celery amqp exchange.delete name
  276. celery amqp queue.delete queue
  277. celery amqp queue.delete queue yes yes
  278. """
  279. def run(self, *args, **options):
  280. options['app'] = self.app
  281. return AMQPAdmin(*args, **options).run()
  282. def main():
  283. amqp().execute_from_commandline()
  284. if __name__ == '__main__': # pragma: no cover
  285. main()