amqp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery amqp` command.
  4. .. program:: celery amqp
  5. """
  6. from __future__ import absolute_import, print_function, unicode_literals
  7. import cmd
  8. import sys
  9. import shlex
  10. import pprint
  11. from functools import partial
  12. from itertools import count
  13. from kombu.utils.encoding import safe_str
  14. from celery.utils.functional import padlist
  15. from celery.bin.base import Command
  16. from celery.five import string_t
  17. from celery.utils import strtobool
  18. __all__ = ['AMQPAdmin', 'AMQShell', 'Spec', 'amqp']
  19. # Map to coerce strings to other types.
  20. COERCE = {bool: strtobool}
  21. HELP_HEADER = """
  22. Commands
  23. --------
  24. """.rstrip()
  25. EXAMPLE_TEXT = """
  26. Example:
  27. -> queue.delete myqueue yes no
  28. """
  29. say = partial(print, file=sys.stderr)
  30. class Spec(object):
  31. """AMQP Command specification.
  32. Used to convert arguments to Python values and display various help
  33. and tooltips.
  34. :param args: see :attr:`args`.
  35. :keyword returns: see :attr:`returns`.
  36. .. attribute args::
  37. List of arguments this command takes. Should
  38. contain `(argument_name, argument_type)` tuples.
  39. .. attribute returns:
  40. Helpful human string representation of what this command returns.
  41. May be :const:`None`, to signify the return type is unknown.
  42. """
  43. def __init__(self, *args, **kwargs):
  44. self.args = args
  45. self.returns = kwargs.get('returns')
  46. def coerce(self, index, value):
  47. """Coerce value for argument at index."""
  48. arg_info = self.args[index]
  49. arg_type = arg_info[1]
  50. # Might be a custom way to coerce the string value,
  51. # so look in the coercion map.
  52. return COERCE.get(arg_type, arg_type)(value)
  53. def str_args_to_python(self, arglist):
  54. """Process list of string arguments to values according to spec.
  55. e.g:
  56. >>> spec = Spec([('queue', str), ('if_unused', bool)])
  57. >>> spec.str_args_to_python('pobox', 'true')
  58. ('pobox', True)
  59. """
  60. return tuple(
  61. self.coerce(index, value) for index, value in enumerate(arglist))
  62. def format_response(self, response):
  63. """Format the return value of this command in a human-friendly way."""
  64. if not self.returns:
  65. return 'ok.' if response is None else response
  66. if callable(self.returns):
  67. return self.returns(response)
  68. return self.returns.format(response)
  69. def format_arg(self, name, type, default_value=None):
  70. if default_value is not None:
  71. return '{0}:{1}'.format(name, default_value)
  72. return name
  73. def format_signature(self):
  74. return ' '.join(self.format_arg(*padlist(list(arg), 3))
  75. for arg in self.args)
  76. def dump_message(message):
  77. if message is None:
  78. return 'No messages in queue. basic.publish something.'
  79. return {'body': message.body,
  80. 'properties': message.properties,
  81. 'delivery_info': message.delivery_info}
  82. def format_declare_queue(ret):
  83. return 'ok. queue:{0} messages:{1} consumers:{2}.'.format(*ret)
  84. class AMQShell(cmd.Cmd):
  85. """AMQP API Shell.
  86. :keyword connect: Function used to connect to the server, must return
  87. connection object.
  88. :keyword silent: If :const:`True`, the commands won't have annoying
  89. output not relevant when running in non-shell mode.
  90. .. attribute: builtins
  91. Mapping of built-in command names -> method names
  92. .. attribute:: amqp
  93. Mapping of AMQP API commands and their :class:`Spec`.
  94. """
  95. conn = None
  96. chan = None
  97. prompt_fmt = '{self.counter}> '
  98. identchars = cmd.IDENTCHARS = '.'
  99. needs_reconnect = False
  100. counter = 1
  101. inc_counter = count(2)
  102. builtins = {'EOF': 'do_exit',
  103. 'exit': 'do_exit',
  104. 'help': 'do_help'}
  105. amqp = {
  106. 'exchange.declare': Spec(('exchange', str),
  107. ('type', str),
  108. ('passive', bool, 'no'),
  109. ('durable', bool, 'no'),
  110. ('auto_delete', bool, 'no'),
  111. ('internal', bool, 'no')),
  112. 'exchange.delete': Spec(('exchange', str),
  113. ('if_unused', bool)),
  114. 'queue.bind': Spec(('queue', str),
  115. ('exchange', str),
  116. ('routing_key', str)),
  117. 'queue.declare': Spec(('queue', str),
  118. ('passive', bool, 'no'),
  119. ('durable', bool, 'no'),
  120. ('exclusive', bool, 'no'),
  121. ('auto_delete', bool, 'no'),
  122. returns=format_declare_queue),
  123. 'queue.delete': Spec(('queue', str),
  124. ('if_unused', bool, 'no'),
  125. ('if_empty', bool, 'no'),
  126. returns='ok. {0} messages deleted.'),
  127. 'queue.purge': Spec(('queue', str),
  128. returns='ok. {0} messages deleted.'),
  129. 'basic.get': Spec(('queue', str),
  130. ('no_ack', bool, 'off'),
  131. returns=dump_message),
  132. 'basic.publish': Spec(('msg', str),
  133. ('exchange', str),
  134. ('routing_key', str),
  135. ('mandatory', bool, 'no'),
  136. ('immediate', bool, 'no')),
  137. 'basic.ack': Spec(('delivery_tag', int)),
  138. }
  139. def __init__(self, *args, **kwargs):
  140. self.connect = kwargs.pop('connect')
  141. self.silent = kwargs.pop('silent', False)
  142. self.out = kwargs.pop('out', sys.stderr)
  143. cmd.Cmd.__init__(self, *args, **kwargs)
  144. self._reconnect()
  145. def note(self, m):
  146. """Say something to the user. Disabled if :attr:`silent`."""
  147. if not self.silent:
  148. say(m, file=self.out)
  149. def say(self, m):
  150. say(m, file=self.out)
  151. def get_amqp_api_command(self, cmd, arglist):
  152. """With a command name and a list of arguments, convert the arguments
  153. to Python values and find the corresponding method on the AMQP channel
  154. object.
  155. :returns: tuple of `(method, processed_args)`.
  156. """
  157. spec = self.amqp[cmd]
  158. args = spec.str_args_to_python(arglist)
  159. attr_name = cmd.replace('.', '_')
  160. if self.needs_reconnect:
  161. self._reconnect()
  162. return getattr(self.chan, attr_name), args, spec.format_response
  163. def do_exit(self, *args):
  164. """The `'exit'` command."""
  165. self.note("\n-> please, don't leave!")
  166. sys.exit(0)
  167. def display_command_help(self, cmd, short=False):
  168. spec = self.amqp[cmd]
  169. self.say('{0} {1}'.format(cmd, spec.format_signature()))
  170. def do_help(self, *args):
  171. if not args:
  172. self.say(HELP_HEADER)
  173. for cmd_name in self.amqp:
  174. self.display_command_help(cmd_name, short=True)
  175. self.say(EXAMPLE_TEXT)
  176. else:
  177. self.display_command_help(args[0])
  178. def default(self, line):
  179. self.say("unknown syntax: {0!r}. how about some 'help'?".format(line))
  180. def get_names(self):
  181. return set(self.builtins) | set(self.amqp)
  182. def completenames(self, text, *ignored):
  183. """Return all commands starting with `text`, for tab-completion."""
  184. names = self.get_names()
  185. first = [cmd for cmd in names
  186. if cmd.startswith(text.replace('_', '.'))]
  187. if first:
  188. return first
  189. return [cmd for cmd in names
  190. if cmd.partition('.')[2].startswith(text)]
  191. def dispatch(self, cmd, arglist):
  192. """Dispatch and execute the command.
  193. Lookup order is: :attr:`builtins` -> :attr:`amqp`.
  194. """
  195. if isinstance(arglist, string_t):
  196. arglist = shlex.split(safe_str(arglist))
  197. if cmd in self.builtins:
  198. return getattr(self, self.builtins[cmd])(*arglist)
  199. fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
  200. return formatter(fun(*args))
  201. def parseline(self, parts):
  202. """Parse input line.
  203. :returns: tuple of three items:
  204. `(command_name, arglist, original_line)`
  205. """
  206. if parts:
  207. return parts[0], parts[1:], ' '.join(parts)
  208. return '', '', ''
  209. def onecmd(self, line):
  210. """Parse line and execute command."""
  211. if isinstance(line, string_t):
  212. line = shlex.split(safe_str(line))
  213. cmd, arg, line = self.parseline(line)
  214. if not line:
  215. return self.emptyline()
  216. self.lastcmd = line
  217. self.counter = next(self.inc_counter)
  218. try:
  219. self.respond(self.dispatch(cmd, arg))
  220. except (AttributeError, KeyError) as exc:
  221. self.default(line)
  222. except Exception as exc:
  223. self.say(exc)
  224. self.needs_reconnect = True
  225. def respond(self, retval):
  226. """What to do with the return value of a command."""
  227. if retval is not None:
  228. if isinstance(retval, string_t):
  229. self.say(retval)
  230. else:
  231. self.say(pprint.pformat(retval))
  232. def _reconnect(self):
  233. """Re-establish connection to the AMQP server."""
  234. self.conn = self.connect(self.conn)
  235. self.chan = self.conn.default_channel
  236. self.needs_reconnect = False
  237. @property
  238. def prompt(self):
  239. return self.prompt_fmt.format(self=self)
  240. class AMQPAdmin(object):
  241. """The celery :program:`celery amqp` utility."""
  242. Shell = AMQShell
  243. def __init__(self, *args, **kwargs):
  244. self.app = kwargs['app']
  245. self.out = kwargs.setdefault('out', sys.stderr)
  246. self.silent = kwargs.get('silent')
  247. self.args = args
  248. def connect(self, conn=None):
  249. if conn:
  250. conn.close()
  251. conn = self.app.connection()
  252. self.note('-> connecting to {0}.'.format(conn.as_uri()))
  253. conn.connect()
  254. self.note('-> connected.')
  255. return conn
  256. def run(self):
  257. shell = self.Shell(connect=self.connect, out=self.out)
  258. if self.args:
  259. return shell.onecmd(self.args)
  260. try:
  261. return shell.cmdloop()
  262. except KeyboardInterrupt:
  263. self.note('(bibi)')
  264. pass
  265. def note(self, m):
  266. if not self.silent:
  267. say(m, file=self.out)
  268. class amqp(Command):
  269. """AMQP Administration Shell.
  270. Also works for non-amqp transports (but not ones that
  271. store declarations in memory).
  272. Examples::
  273. celery amqp
  274. start shell mode
  275. celery amqp help
  276. show list of commands
  277. celery amqp exchange.delete name
  278. celery amqp queue.delete queue
  279. celery amqp queue.delete queue yes yes
  280. """
  281. def run(self, *args, **options):
  282. options['app'] = self.app
  283. return AMQPAdmin(*args, **options).run()
  284. def main():
  285. amqp().execute_from_commandline()
  286. if __name__ == '__main__': # pragma: no cover
  287. main()