celeryctl.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import with_statement
  4. import sys
  5. from optparse import OptionParser, make_option as Option
  6. from pprint import pformat
  7. from textwrap import wrap
  8. from anyjson import deserialize
  9. from celery import __version__
  10. from celery.app import app_or_default, current_app
  11. from celery.utils import term
  12. from celery.bin.base import Command as CeleryCommand
  13. commands = {}
  14. class Error(Exception):
  15. pass
  16. def command(fun, name=None):
  17. commands[name or fun.__name__] = fun
  18. return fun
  19. class Command(object):
  20. help = ""
  21. args = ""
  22. version = __version__
  23. option_list = CeleryCommand.preload_options + (
  24. Option("--quiet", "-q", action="store_true", dest="quiet",
  25. default=False),
  26. Option("--no-color", "-C", dest="no_color", action="store_true",
  27. help="Don't colorize output."),
  28. )
  29. def __init__(self, app=None, no_color=False):
  30. self.app = app_or_default(app)
  31. self.colored = term.colored(enabled=not no_color)
  32. def __call__(self, *args, **kwargs):
  33. try:
  34. self.run(*args, **kwargs)
  35. except Error, exc:
  36. self.error(self.colored.red("Error: %s" % exc))
  37. def error(self, s):
  38. return self.out(s, fh=sys.stderr)
  39. def out(self, s, fh=sys.stdout):
  40. s = str(s)
  41. if not s.endswith("\n"):
  42. s += "\n"
  43. sys.stdout.write(s)
  44. def create_parser(self, prog_name, command):
  45. return OptionParser(prog=prog_name,
  46. usage=self.usage(command),
  47. version=self.version,
  48. option_list=self.option_list)
  49. def run_from_argv(self, prog_name, argv):
  50. self.prog_name = prog_name
  51. self.command = argv[0]
  52. self.arglist = argv[1:]
  53. self.parser = self.create_parser(self.prog_name, self.command)
  54. options, args = self.parser.parse_args(self.arglist)
  55. self.colored = term.colored(enabled=not options.no_color)
  56. self(*args, **options.__dict__)
  57. def run(self, *args, **kwargs):
  58. raise NotImplementedError()
  59. def usage(self, command):
  60. return "%%prog %s [options] %s" % (command, self.args)
  61. def prettify_list(self, n):
  62. c = self.colored
  63. if not n:
  64. return "- empty -"
  65. return "\n".join(str(c.reset(c.white("*"), " %s" % (item, )))
  66. for item in n)
  67. def prettify_dict_ok_error(self, n):
  68. c = self.colored
  69. if "ok" in n:
  70. return (c.green("OK"),
  71. indent(self.prettify(n["ok"])[1]))
  72. elif "error" in n:
  73. return (c.red("ERROR"),
  74. indent(self.prettify(n["error"])[1]))
  75. def prettify(self, n):
  76. OK = str(self.colored.green("OK"))
  77. if isinstance(n, list):
  78. return OK, self.prettify_list(n)
  79. if isinstance(n, dict):
  80. if "ok" in n or "error" in n:
  81. return self.prettify_dict_ok_error(n)
  82. if isinstance(n, basestring):
  83. return OK, unicode(n)
  84. return OK, pformat(n)
  85. class list_(Command):
  86. args = "<bindings>"
  87. def list_bindings(self, channel):
  88. fmt = lambda q, e, r: self.out("%s %s %s" % (q.ljust(28),
  89. e.ljust(28), r))
  90. fmt("Queue", "Exchange", "Routing Key")
  91. fmt("-" * 16, "-" * 16, "-" * 16)
  92. for binding in channel.list_bindings():
  93. fmt(*binding)
  94. def run(self, what, *_, **kw):
  95. topics = {"bindings": self.list_bindings}
  96. if what not in topics:
  97. raise ValueError("%r not in %r" % (what, topics.keys()))
  98. with self.app.broker_connection() as conn:
  99. self.app.amqp.get_task_consumer(conn).declare()
  100. with conn.channel() as channel:
  101. return topics[what](channel)
  102. list_ = command(list_, "list")
  103. class apply(Command):
  104. args = "<task_name>"
  105. option_list = Command.option_list + (
  106. Option("--args", "-a", dest="args"),
  107. Option("--kwargs", "-k", dest="kwargs"),
  108. Option("--eta", dest="eta"),
  109. Option("--countdown", dest="countdown", type="int"),
  110. Option("--expires", dest="expires"),
  111. Option("--serializer", dest="serializer", default="json"),
  112. Option("--queue", dest="queue"),
  113. Option("--exchange", dest="exchange"),
  114. Option("--routing-key", dest="routing_key"),
  115. )
  116. def run(self, name, *_, **kw):
  117. # Positional args.
  118. args = kw.get("args") or ()
  119. if isinstance(args, basestring):
  120. args = deserialize(args)
  121. # Keyword args.
  122. kwargs = kw.get("kwargs") or {}
  123. if isinstance(kwargs, basestring):
  124. kwargs = deserialize(kwargs)
  125. # Expires can be int.
  126. expires = kw.get("expires") or None
  127. try:
  128. expires = int(expires)
  129. except (TypeError, ValueError):
  130. pass
  131. res = self.app.send_task(name, args=args, kwargs=kwargs,
  132. countdown=kw.get("countdown"),
  133. serializer=kw.get("serializer"),
  134. queue=kw.get("queue"),
  135. exchange=kw.get("exchange"),
  136. routing_key=kw.get("routing_key"),
  137. eta=kw.get("eta"),
  138. expires=expires)
  139. self.out(res.task_id)
  140. apply = command(apply)
  141. def pluralize(n, text, suffix='s'):
  142. if n > 1:
  143. return text + suffix
  144. return text
  145. class purge(Command):
  146. def run(self, *args, **kwargs):
  147. app = current_app()
  148. queues = len(app.amqp.queues.keys())
  149. messages_removed = app.control.discard_all()
  150. if messages_removed:
  151. self.out("Purged %s %s from %s known task %s." % (
  152. messages_removed, pluralize(messages_removed, "message"),
  153. queues, pluralize(queues, "queue")))
  154. else:
  155. self.out("No messages purged from %s known %s" % (
  156. queues, pluralize(queues, "queue")))
  157. purge = command(purge)
  158. class result(Command):
  159. args = "<task_id>"
  160. option_list = Command.option_list + (
  161. Option("--task", "-t", dest="task"),
  162. )
  163. def run(self, task_id, *args, **kwargs):
  164. from celery import registry
  165. result_cls = self.app.AsyncResult
  166. task = kwargs.get("task")
  167. if task:
  168. result_cls = registry.tasks[task].AsyncResult
  169. result = result_cls(task_id)
  170. self.out(self.prettify(result.get())[1])
  171. result = command(result)
  172. class inspect(Command):
  173. choices = {"active": 1.0,
  174. "active_queues": 1.0,
  175. "scheduled": 1.0,
  176. "reserved": 1.0,
  177. "stats": 1.0,
  178. "revoked": 1.0,
  179. "registered_tasks": 1.0, # alias to registered
  180. "registered": 1.0,
  181. "enable_events": 1.0,
  182. "disable_events": 1.0,
  183. "ping": 0.2,
  184. "add_consumer": 1.0,
  185. "cancel_consumer": 1.0}
  186. option_list = Command.option_list + (
  187. Option("--timeout", "-t", type="float", dest="timeout",
  188. default=None,
  189. help="Timeout in seconds (float) waiting for reply"),
  190. Option("--destination", "-d", dest="destination",
  191. help="Comma separated list of destination node names."))
  192. show_body = True
  193. def usage(self, command):
  194. return "%%prog %s [options] %s [%s]" % (
  195. command, self.args, "|".join(self.choices.keys()))
  196. def run(self, *args, **kwargs):
  197. self.quiet = kwargs.get("quiet", False)
  198. self.show_body = kwargs.get("show_body", True)
  199. if not args:
  200. raise Error("Missing inspect command. See --help")
  201. command = args[0]
  202. if command == "help":
  203. raise Error("Did you mean 'inspect --help'?")
  204. if command not in self.choices:
  205. raise Error("Unknown inspect command: %s" % command)
  206. destination = kwargs.get("destination")
  207. timeout = kwargs.get("timeout") or self.choices[command]
  208. if destination and isinstance(destination, basestring):
  209. destination = map(str.strip, destination.split(","))
  210. def on_reply(body):
  211. c = self.colored
  212. node = body.keys()[0]
  213. reply = body[node]
  214. status, preply = self.prettify(reply)
  215. self.say("->", c.cyan(node, ": ") + status, indent(preply))
  216. self.say("<-", command)
  217. i = self.app.control.inspect(destination=destination,
  218. timeout=timeout,
  219. callback=on_reply)
  220. replies = getattr(i, command)(*args[1:])
  221. if not replies:
  222. raise Error("No nodes replied within time constraint.")
  223. return replies
  224. def say(self, direction, title, body=""):
  225. c = self.colored
  226. if direction == "<-" and self.quiet:
  227. return
  228. dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""
  229. self.out(c.reset(dirstr, title))
  230. if body and self.show_body:
  231. self.out(body)
  232. inspect = command(inspect)
  233. def indent(s, n=4):
  234. i = [" " * n + l for l in s.split("\n")]
  235. return "\n".join("\n".join(wrap(j)) for j in i)
  236. class status(Command):
  237. option_list = inspect.option_list
  238. def run(self, *args, **kwargs):
  239. replies = inspect(app=self.app,
  240. no_color=kwargs.get("no_color", False)) \
  241. .run("ping", **dict(kwargs, quiet=True, show_body=False))
  242. if not replies:
  243. raise Error("No nodes replied within time constraint")
  244. nodecount = len(replies)
  245. if not kwargs.get("quiet", False):
  246. self.out("\n%s %s online." % (nodecount,
  247. nodecount > 1 and "nodes" or "node"))
  248. status = command(status)
  249. class help(Command):
  250. def usage(self, command):
  251. return "%%prog <command> [options] %s" % (self.args, )
  252. def run(self, *args, **kwargs):
  253. self.parser.print_help()
  254. usage = ["",
  255. "Type '%s <command> --help' for help on a "
  256. "specific command." % (self.prog_name, ),
  257. "",
  258. "Available commands:"]
  259. for command in list(sorted(commands.keys())):
  260. usage.append(" %s" % command)
  261. self.out("\n".join(usage))
  262. help = command(help)
  263. class celeryctl(CeleryCommand):
  264. commands = commands
  265. def execute(self, command, argv=None):
  266. try:
  267. cls = self.commands[command]
  268. except KeyError:
  269. cls, argv = self.commands["help"], ["help"]
  270. cls = self.commands.get(command) or self.commands["help"]
  271. try:
  272. cls(app=self.app).run_from_argv(self.prog_name, argv)
  273. except Error:
  274. return self.execute("help", argv)
  275. def remove_options_at_beginning(self, argv, index=0):
  276. if argv:
  277. while index <= len(argv):
  278. value = argv[index]
  279. if value.startswith("--"):
  280. pass
  281. elif value.startswith("-"):
  282. index += 1
  283. else:
  284. return argv[index:]
  285. index += 1
  286. return []
  287. def handle_argv(self, prog_name, argv):
  288. self.prog_name = prog_name
  289. argv = self.remove_options_at_beginning(argv)
  290. try:
  291. command = argv[0]
  292. except IndexError:
  293. command, argv = "help", ["help"]
  294. return self.execute(command, argv)
  295. def main():
  296. try:
  297. celeryctl().execute_from_commandline()
  298. except KeyboardInterrupt:
  299. pass
  300. if __name__ == "__main__": # pragma: no cover
  301. main()