amqp.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. # -*- coding: utf-8 -*-
  2. """The :program:`celery amqp` command.
  3. .. program:: celery amqp
  4. """
  5. from __future__ import absolute_import, print_function, unicode_literals
  6. import cmd
  7. import sys
  8. import shlex
  9. import pprint
  10. from functools import partial
  11. from itertools import count
  12. from kombu.utils.encoding import safe_str
  13. from celery.utils.functional import padlist
  14. from celery.bin.base import Command
  15. from celery.five import string_t
  16. from celery.utils.serialization import strtobool
  17. __all__ = ['AMQPAdmin', 'AMQShell', 'Spec', 'amqp']
  18. # Map to coerce strings to other types.
  19. COERCE = {bool: strtobool}
  20. HELP_HEADER = """
  21. Commands
  22. --------
  23. """.rstrip()
  24. EXAMPLE_TEXT = """
  25. Example:
  26. -> queue.delete myqueue yes no
  27. """
  28. say = partial(print, file=sys.stderr)
  29. class Spec(object):
  30. """AMQP Command specification.
  31. Used to convert arguments to Python values and display various help
  32. and tool-tips.
  33. Arguments:
  34. args (Sequence): see :attr:`args`.
  35. returns (str): see :attr:`returns`.
  36. """
  37. #: List of arguments this command takes.
  38. #: Should contain ``(argument_name, argument_type)`` tuples.
  39. args = None
  40. #: Helpful human string representation of what this command returns.
  41. #: May be :const:`None`, to signify the return type is unknown.
  42. returns = None
  43. def __init__(self, *args, **kwargs):
  44. self.args = args
  45. self.returns = kwargs.get('returns')
  46. def coerce(self, index, value):
  47. """Coerce value for argument at index."""
  48. arg_info = self.args[index]
  49. arg_type = arg_info[1]
  50. # Might be a custom way to coerce the string value,
  51. # so look in the coercion map.
  52. return COERCE.get(arg_type, arg_type)(value)
  53. def str_args_to_python(self, arglist):
  54. """Process list of string arguments to values according to spec.
  55. e.g::
  56. >>> spec = Spec([('queue', str), ('if_unused', bool)])
  57. >>> spec.str_args_to_python('pobox', 'true')
  58. ('pobox', True)
  59. """
  60. return tuple(
  61. self.coerce(index, value) for index, value in enumerate(arglist))
  62. def format_response(self, response):
  63. """Format the return value of this command in a human-friendly way."""
  64. if not self.returns:
  65. return 'ok.' if response is None else response
  66. if callable(self.returns):
  67. return self.returns(response)
  68. return self.returns.format(response)
  69. def format_arg(self, name, type, default_value=None):
  70. if default_value is not None:
  71. return '{0}:{1}'.format(name, default_value)
  72. return name
  73. def format_signature(self):
  74. return ' '.join(self.format_arg(*padlist(list(arg), 3))
  75. for arg in self.args)
  76. def dump_message(message):
  77. if message is None:
  78. return 'No messages in queue. basic.publish something.'
  79. return {'body': message.body,
  80. 'properties': message.properties,
  81. 'delivery_info': message.delivery_info}
  82. def format_declare_queue(ret):
  83. return 'ok. queue:{0} messages:{1} consumers:{2}.'.format(*ret)
  84. class AMQShell(cmd.Cmd):
  85. """AMQP API Shell.
  86. Arguments:
  87. connect (Callable): Function used to connect to the server.
  88. Must return :class:`kombu.Connection` object.
  89. silent (bool): If enabled, the commands won't have annoying
  90. output not relevant when running in non-shell mode.
  91. """
  92. conn = None
  93. chan = None
  94. prompt_fmt = '{self.counter}> '
  95. identchars = cmd.IDENTCHARS = '.'
  96. needs_reconnect = False
  97. counter = 1
  98. inc_counter = count(2)
  99. #: Map of built-in command names -> method names
  100. builtins = {
  101. 'EOF': 'do_exit',
  102. 'exit': 'do_exit',
  103. 'help': 'do_help',
  104. }
  105. #: Map of AMQP API commands and their :class:`Spec`.
  106. amqp = {
  107. 'exchange.declare': Spec(('exchange', str),
  108. ('type', str),
  109. ('passive', bool, 'no'),
  110. ('durable', bool, 'no'),
  111. ('auto_delete', bool, 'no'),
  112. ('internal', bool, 'no')),
  113. 'exchange.delete': Spec(('exchange', str),
  114. ('if_unused', bool)),
  115. 'queue.bind': Spec(('queue', str),
  116. ('exchange', str),
  117. ('routing_key', str)),
  118. 'queue.declare': Spec(('queue', str),
  119. ('passive', bool, 'no'),
  120. ('durable', bool, 'no'),
  121. ('exclusive', bool, 'no'),
  122. ('auto_delete', bool, 'no'),
  123. returns=format_declare_queue),
  124. 'queue.delete': Spec(('queue', str),
  125. ('if_unused', bool, 'no'),
  126. ('if_empty', bool, 'no'),
  127. returns='ok. {0} messages deleted.'),
  128. 'queue.purge': Spec(('queue', str),
  129. returns='ok. {0} messages deleted.'),
  130. 'basic.get': Spec(('queue', str),
  131. ('no_ack', bool, 'off'),
  132. returns=dump_message),
  133. 'basic.publish': Spec(('msg', str),
  134. ('exchange', str),
  135. ('routing_key', str),
  136. ('mandatory', bool, 'no'),
  137. ('immediate', bool, 'no')),
  138. 'basic.ack': Spec(('delivery_tag', int)),
  139. }
  140. def _prepare_spec(self, conn):
  141. # XXX Hack to fix Issue #2013
  142. from amqp import Connection, Message
  143. if isinstance(conn.connection, Connection):
  144. self.amqp['basic.publish'] = Spec(('msg', Message),
  145. ('exchange', str),
  146. ('routing_key', str),
  147. ('mandatory', bool, 'no'),
  148. ('immediate', bool, 'no'))
  149. def __init__(self, *args, **kwargs):
  150. self.connect = kwargs.pop('connect')
  151. self.silent = kwargs.pop('silent', False)
  152. self.out = kwargs.pop('out', sys.stderr)
  153. cmd.Cmd.__init__(self, *args, **kwargs)
  154. self._reconnect()
  155. def note(self, m):
  156. """Say something to the user. Disabled if :attr:`silent`."""
  157. if not self.silent:
  158. say(m, file=self.out)
  159. def say(self, m):
  160. say(m, file=self.out)
  161. def get_amqp_api_command(self, cmd, arglist):
  162. """With a command name and a list of arguments, convert the arguments
  163. to Python values and find the corresponding method on the AMQP channel
  164. object.
  165. Returns:
  166. Tuple: of `(method, processed_args)` pairs.
  167. """
  168. spec = self.amqp[cmd]
  169. args = spec.str_args_to_python(arglist)
  170. attr_name = cmd.replace('.', '_')
  171. if self.needs_reconnect:
  172. self._reconnect()
  173. return getattr(self.chan, attr_name), args, spec.format_response
  174. def do_exit(self, *args):
  175. """The `'exit'` command."""
  176. self.note("\n-> please, don't leave!")
  177. sys.exit(0)
  178. def display_command_help(self, cmd, short=False):
  179. spec = self.amqp[cmd]
  180. self.say('{0} {1}'.format(cmd, spec.format_signature()))
  181. def do_help(self, *args):
  182. if not args:
  183. self.say(HELP_HEADER)
  184. for cmd_name in self.amqp:
  185. self.display_command_help(cmd_name, short=True)
  186. self.say(EXAMPLE_TEXT)
  187. else:
  188. self.display_command_help(args[0])
  189. def default(self, line):
  190. self.say("unknown syntax: {0!r}. how about some 'help'?".format(line))
  191. def get_names(self):
  192. return set(self.builtins) | set(self.amqp)
  193. def completenames(self, text, *ignored):
  194. """Return all commands starting with `text`, for tab-completion."""
  195. names = self.get_names()
  196. first = [cmd for cmd in names
  197. if cmd.startswith(text.replace('_', '.'))]
  198. if first:
  199. return first
  200. return [cmd for cmd in names
  201. if cmd.partition('.')[2].startswith(text)]
  202. def dispatch(self, cmd, arglist):
  203. """Dispatch and execute the command.
  204. Look-up order is: :attr:`builtins` -> :attr:`amqp`.
  205. """
  206. if isinstance(arglist, string_t):
  207. arglist = shlex.split(safe_str(arglist))
  208. if cmd in self.builtins:
  209. return getattr(self, self.builtins[cmd])(*arglist)
  210. fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
  211. return formatter(fun(*args))
  212. def parseline(self, parts):
  213. """Parse input line.
  214. Returns:
  215. Tuple: of three items:
  216. `(command_name, arglist, original_line)`
  217. """
  218. if parts:
  219. return parts[0], parts[1:], ' '.join(parts)
  220. return '', '', ''
  221. def onecmd(self, line):
  222. """Parse line and execute command."""
  223. if isinstance(line, string_t):
  224. line = shlex.split(safe_str(line))
  225. cmd, arg, line = self.parseline(line)
  226. if not line:
  227. return self.emptyline()
  228. self.lastcmd = line
  229. self.counter = next(self.inc_counter)
  230. try:
  231. self.respond(self.dispatch(cmd, arg))
  232. except (AttributeError, KeyError) as exc:
  233. self.default(line)
  234. except Exception as exc:
  235. self.say(exc)
  236. self.needs_reconnect = True
  237. def respond(self, retval):
  238. """What to do with the return value of a command."""
  239. if retval is not None:
  240. if isinstance(retval, string_t):
  241. self.say(retval)
  242. else:
  243. self.say(pprint.pformat(retval))
  244. def _reconnect(self):
  245. """Re-establish connection to the AMQP server."""
  246. self.conn = self.connect(self.conn)
  247. self._prepare_spec(self.conn)
  248. self.chan = self.conn.default_channel
  249. self.needs_reconnect = False
  250. @property
  251. def prompt(self):
  252. return self.prompt_fmt.format(self=self)
  253. class AMQPAdmin(object):
  254. """The celery :program:`celery amqp` utility."""
  255. Shell = AMQShell
  256. def __init__(self, *args, **kwargs):
  257. self.app = kwargs['app']
  258. self.out = kwargs.setdefault('out', sys.stderr)
  259. self.silent = kwargs.get('silent')
  260. self.args = args
  261. def connect(self, conn=None):
  262. if conn:
  263. conn.close()
  264. conn = self.app.connection()
  265. self.note('-> connecting to {0}.'.format(conn.as_uri()))
  266. conn.connect()
  267. self.note('-> connected.')
  268. return conn
  269. def run(self):
  270. shell = self.Shell(connect=self.connect, out=self.out)
  271. if self.args:
  272. return shell.onecmd(self.args)
  273. try:
  274. return shell.cmdloop()
  275. except KeyboardInterrupt:
  276. self.note('(bibi)')
  277. pass
  278. def note(self, m):
  279. if not self.silent:
  280. say(m, file=self.out)
  281. class amqp(Command):
  282. """AMQP Administration Shell.
  283. Also works for non-AMQP transports (but not ones that
  284. store declarations in memory).
  285. Examples:
  286. .. code-block:: console
  287. $ # start shell mode
  288. $ celery amqp
  289. $ # show list of commands
  290. $ celery amqp help
  291. $ celery amqp exchange.delete name
  292. $ celery amqp queue.delete queue
  293. $ celery amqp queue.delete queue yes yes
  294. """
  295. def run(self, *args, **options):
  296. options['app'] = self.app
  297. return AMQPAdmin(*args, **options).run()
  298. def main():
  299. amqp().execute_from_commandline()
  300. if __name__ == '__main__': # pragma: no cover
  301. main()