camqadm.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #!/usr/bin/env python
  2. """camqadm
  3. .. program:: camqadm
  4. .. cmdoption:: -X, --x
  5. Description
  6. """
  7. import os
  8. import cmd
  9. import sys
  10. import shlex
  11. import pprint
  12. import readline
  13. import optparse
  14. from celery.utils import info
  15. from celery.messaging import establish_connection
  16. def say(m):
  17. sys.stderr.write("%s\n" % (m, ))
  18. OPTION_LIST = (
  19. #optparse.make_option('-c', '--concurrency',
  20. # default=conf.CELERYD_CONCURRENCY,
  21. # action="store", dest="concurrency", type="int",
  22. # help="Number of child processes processing the queue."),
  23. )
  24. class Spec(object):
  25. def __init__(self, *arglist, **kwargs):
  26. self.arglist = arglist
  27. self.returns = kwargs.get("returns")
  28. class AMQShell(cmd.Cmd):
  29. conn = None
  30. chan = None
  31. prompt = "--> "
  32. identchars = cmd.IDENTCHARS = "."
  33. needs_reconnect = False
  34. builtins = {"exit": "do_exit",
  35. "EOF": "do_exit"}
  36. amqp = {
  37. "exchange.declare": Spec(("exchange", str),
  38. ("type", str),
  39. ("passive", bool),
  40. ("durable", bool),
  41. ("auto_delete", bool),
  42. ("internal", bool)),
  43. "exchange.delete": Spec(("exchange", str),
  44. ("if_unused", bool)),
  45. "queue.bind": Spec(("queue", str),
  46. ("exchange", str),
  47. ("routing_key", str)),
  48. "queue.declare": Spec(("queue", str),
  49. ("passive", bool),
  50. ("durable", bool),
  51. ("exclusive", bool),
  52. ("auto_delete", bool),
  53. returns="Messages purged"),
  54. "queue.delete": Spec(("queue", str),
  55. ("if_unused", bool),
  56. ("if_empty", bool)),
  57. "queue.purge": Spec(("queue", str), returns="Messages purged"),
  58. "basic.get": Spec(("queue", str),
  59. ("no_ack", bool)),
  60. }
  61. def __init__(self, *args, **kwargs):
  62. self.connect = kwargs.pop("connect")
  63. self.silent = kwargs.pop("silent", False)
  64. cmd.Cmd.__init__(self, *args, **kwargs)
  65. self._reconnect()
  66. def say(self, m):
  67. if not self.silent:
  68. say(m)
  69. def _reconnect(self):
  70. self.conn = self.connect(self.conn)
  71. self.chan = self.conn.create_backend().channel
  72. self.needs_reconnect = False
  73. def _apply_spec(self, arglist, spec):
  74. return arglist
  75. def _get_amqp_api_command(self, cmd, arglist):
  76. spec = self.amqp[cmd]
  77. attr_name = cmd.replace(".", "_")
  78. if self.needs_reconnect:
  79. self._reconnect()
  80. return (getattr(self.chan, attr_name),
  81. self._apply_spec(arglist, spec))
  82. def do_exit(self, *args):
  83. self.say("\n-> please, don't leave!")
  84. sys.exit(0)
  85. def completenames(self, text, *ignored):
  86. return [cmd for cmd in set(self.builtins.keys() + self.amqp.keys())
  87. if cmd.startswith(text.replace(".", "_"))]
  88. def dispatch(self, cmd, argline):
  89. arglist = shlex.split(argline)
  90. if cmd in self.builtins:
  91. return getattr(self, self.builtins[cmd])(*arglist)
  92. fun, args = self._get_amqp_api_command(cmd, arglist)
  93. return fun(*args)
  94. def parseline(self, line):
  95. parts = line.split()
  96. return parts[0], " ".join(parts[1:]), line
  97. def onecmd(self, line):
  98. cmd, arg, line = self.parseline(line)
  99. if not line:
  100. return self.emptyline()
  101. if cmd is None:
  102. return self.default(line)
  103. self.lastcmd = line
  104. if cmd == '':
  105. return self.default(line)
  106. else:
  107. try:
  108. self.respond(self.dispatch(cmd, arg))
  109. except (AttributeError, KeyError), exc:
  110. self.default(line)
  111. except Exception, exc:
  112. say(exc)
  113. self.needs_reconnect = True
  114. def respond(self, retval):
  115. pprint.pprint(retval)
  116. class AMQPAdmin(object):
  117. def __init__(self, *args, **kwargs):
  118. self.silent = bool(args)
  119. if "silent" in kwargs:
  120. self.silent = kwargs["silent"]
  121. self.args = args
  122. def connect(self, conn=None):
  123. if conn:
  124. conn.close()
  125. self.say("-> connecting to %s." % info.format_broker_info())
  126. conn = establish_connection()
  127. conn.connect()
  128. self.say("-> connected.")
  129. return conn
  130. def run(self):
  131. shell = AMQShell(connect=self.connect)
  132. if self.args:
  133. return shell.onecmd(" ".join(self.args))
  134. return shell.cmdloop()
  135. def say(self, m):
  136. if not self.silent:
  137. say(m)
  138. def parse_options(arguments):
  139. """Parse the available options to ``celeryd``."""
  140. parser = optparse.OptionParser(option_list=OPTION_LIST)
  141. options, values = parser.parse_args(arguments)
  142. return options, values
  143. def camqadm(*args, **options):
  144. return AMQPAdmin(*args, **options).run()
  145. def main():
  146. options, values = parse_options(sys.argv[1:])
  147. return run_worker(*values, **vars(options))
  148. if __name__ == "__main__":
  149. main()