celeryctl.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. import sys
  2. from optparse import OptionParser, make_option as Option
  3. from pprint import pformat
  4. from textwrap import wrap
  5. from anyjson import deserialize
  6. from celery import __version__
  7. from celery.app import app_or_default
  8. from celery.bin.base import Command as CeleryCommand
  9. from celery.utils import term
  10. commands = {}
  11. class Error(Exception):
  12. pass
  13. def command(fun):
  14. commands[fun.__name__] = fun
  15. return fun
  16. class Command(object):
  17. help = ""
  18. args = ""
  19. version = __version__
  20. option_list = CeleryCommand.preload_options + (
  21. Option("--quiet", "-q", action="store_true", dest="quiet",
  22. default=False),
  23. Option("--no-color", "-C", dest="no_color", action="store_true",
  24. help="Don't colorize output."),
  25. )
  26. def __init__(self, app=None, no_color=False):
  27. self.app = app_or_default(app)
  28. self.colored = term.colored(enabled=not no_color)
  29. def __call__(self, *args, **kwargs):
  30. try:
  31. self.run(*args, **kwargs)
  32. except Error, exc:
  33. self.error(self.colored.red("Error: %s" % exc))
  34. def error(self, s):
  35. return self.out(s, fh=sys.stderr)
  36. def out(self, s, fh=sys.stdout):
  37. s = str(s)
  38. if not s.endswith("\n"):
  39. s += "\n"
  40. sys.stdout.write(s)
  41. def create_parser(self, prog_name, command):
  42. return OptionParser(prog=prog_name,
  43. usage=self.usage(command),
  44. version=self.version,
  45. option_list=self.option_list)
  46. def run_from_argv(self, prog_name, argv):
  47. self.prog_name = prog_name
  48. self.command = argv[0]
  49. self.arglist = argv[1:]
  50. self.parser = self.create_parser(self.prog_name, self.command)
  51. options, args = self.parser.parse_args(self.arglist)
  52. self.colored = term.colored(enabled=not options.no_color)
  53. self(*args, **options.__dict__)
  54. def run(self, *args, **kwargs):
  55. raise NotImplementedError()
  56. def usage(self, command):
  57. return "%%prog %s [options] %s" % (command, self.args)
  58. def prettify_list(self, n):
  59. c = self.colored
  60. if not n:
  61. return "- empty -"
  62. return "\n".join(str(c.reset(c.white("*"), " %s" % (item, )))
  63. for item in n)
  64. def prettify_dict_ok_error(self, n):
  65. c = self.colored
  66. if "ok" in n:
  67. return (c.green("OK"),
  68. indent(self.prettify(n["ok"])[1]))
  69. elif "error" in n:
  70. return (c.red("ERROR"),
  71. indent(self.prettify(n["error"])[1]))
  72. def prettify(self, n):
  73. OK = str(self.colored.green("OK"))
  74. if isinstance(n, list):
  75. return OK, self.prettify_list(n)
  76. if isinstance(n, dict):
  77. if "ok" in n or "error" in n:
  78. return self.prettify_dict_ok_error(n)
  79. if isinstance(n, basestring):
  80. return OK, unicode(n)
  81. return OK, pformat(n)
  82. class apply(Command):
  83. args = "<task_name>"
  84. option_list = Command.option_list + (
  85. Option("--args", "-a", dest="args"),
  86. Option("--kwargs", "-k", dest="kwargs"),
  87. Option("--eta", dest="eta"),
  88. Option("--countdown", dest="countdown", type="int"),
  89. Option("--expires", dest="expires"),
  90. Option("--serializer", dest="serializer", default="json"),
  91. Option("--queue", dest="queue"),
  92. Option("--exchange", dest="exchange"),
  93. Option("--routing-key", dest="routing_key"),
  94. )
  95. def run(self, name, *_, **kw):
  96. # Positional args.
  97. args = kw.get("args") or ()
  98. if isinstance(args, basestring):
  99. args = deserialize(args)
  100. # Keyword args.
  101. kwargs = kw.get("kwargs") or {}
  102. if isinstance(kwargs, basestring):
  103. kwargs = deserialize(kwargs)
  104. # Expires can be int.
  105. expires = kw.get("expires") or None
  106. try:
  107. expires = int(expires)
  108. except (TypeError, ValueError):
  109. pass
  110. res = self.app.send_task(name, args=args, kwargs=kwargs,
  111. countdown=kw.get("countdown"),
  112. serializer=kw.get("serializer"),
  113. queue=kw.get("queue"),
  114. exchange=kw.get("exchange"),
  115. routing_key=kw.get("routing_key"),
  116. eta=kw.get("eta"),
  117. expires=expires)
  118. self.out(res.task_id)
  119. apply = command(apply)
  120. class result(Command):
  121. args = "<task_id>"
  122. option_list = Command.option_list + (
  123. Option("--task", "-t", dest="task"),
  124. )
  125. def run(self, task_id, *args, **kwargs):
  126. from celery import registry
  127. result_cls = self.app.AsyncResult
  128. task = kwargs.get("task")
  129. if task:
  130. result_cls = registry.tasks[task].AsyncResult
  131. result = result_cls(task_id)
  132. self.out(self.prettify(result.get())[1])
  133. result = command(result)
  134. class inspect(Command):
  135. choices = {"active": 1.0,
  136. "scheduled": 1.0,
  137. "reserved": 1.0,
  138. "stats": 1.0,
  139. "revoked": 1.0,
  140. "registered_tasks": 1.0,
  141. "enable_events": 1.0,
  142. "disable_events": 1.0,
  143. "ping": 0.2}
  144. option_list = Command.option_list + (
  145. Option("--timeout", "-t", type="float", dest="timeout",
  146. default=None,
  147. help="Timeout in seconds (float) waiting for reply"),
  148. Option("--destination", "-d", dest="destination",
  149. help="Comma separated list of destination node names."))
  150. def usage(self, command):
  151. return "%%prog %s [options] %s [%s]" % (
  152. command, self.args, "|".join(self.choices.keys()))
  153. def run(self, *args, **kwargs):
  154. self.quiet = kwargs.get("quiet", False)
  155. if not args:
  156. raise Error("Missing inspect command. See --help")
  157. command = args[0]
  158. if command == "help":
  159. raise Error("Did you mean 'inspect --help'?")
  160. if command not in self.choices:
  161. raise Error("Unknown inspect command: %s" % command)
  162. destination = kwargs.get("destination")
  163. timeout = kwargs.get("timeout") or self.choices[command]
  164. if destination and isinstance(destination, basestring):
  165. destination = map(str.strip, destination.split(","))
  166. def on_reply(message_data):
  167. c = self.colored
  168. node = message_data.keys()[0]
  169. reply = message_data[node]
  170. status, preply = self.prettify(reply)
  171. self.say("->", c.cyan(node, ": ") + status, indent(preply))
  172. self.say("<-", command)
  173. i = self.app.control.inspect(destination=destination,
  174. timeout=timeout,
  175. callback=on_reply)
  176. replies = getattr(i, command)()
  177. if not replies:
  178. raise Error("No nodes replied within time constraint.")
  179. return replies
  180. def say(self, direction, title, body=""):
  181. c = self.colored
  182. if direction == "<-" and self.quiet:
  183. return
  184. dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""
  185. self.out(c.reset(dirstr, title))
  186. if body and not self.quiet:
  187. self.out(body)
  188. inspect = command(inspect)
  189. def indent(s, n=4):
  190. i = [" " * n + l for l in s.split("\n")]
  191. return "\n".join("\n".join(wrap(j)) for j in i)
  192. class status(Command):
  193. option_list = inspect.option_list
  194. def run(self, *args, **kwargs):
  195. replies = inspect(app=self.app,
  196. no_color=kwargs.get("no_color", False)) \
  197. .run("ping", **dict(kwargs, quiet=True))
  198. if not replies:
  199. raise Error("No nodes replied within time constraint")
  200. nodecount = len(replies)
  201. if not kwargs.get("quiet", False):
  202. self.out("\n%s %s online." % (nodecount,
  203. nodecount > 1 and "nodes" or "node"))
  204. status = command(status)
  205. class help(Command):
  206. def usage(self, command):
  207. return "%%prog <command> [options] %s" % (self.args, )
  208. def run(self, *args, **kwargs):
  209. self.parser.print_help()
  210. usage = ["",
  211. "Type '%s <command> --help' for help on a "
  212. "specific command." % (self.prog_name, ),
  213. "",
  214. "Available commands:"]
  215. for command in list(sorted(commands.keys())):
  216. usage.append(" %s" % command)
  217. self.out("\n".join(usage))
  218. help = command(help)
  219. class celeryctl(CeleryCommand):
  220. commands = commands
  221. def execute(self, command, argv=None):
  222. try:
  223. cls = self.commands[command]
  224. except KeyError:
  225. cls, argv = self.commands["help"], ["help"]
  226. cls = self.commands.get(command) or self.commands["help"]
  227. try:
  228. cls(app=self.app).run_from_argv(self.prog_name, argv)
  229. except Error:
  230. return self.execute("help", argv)
  231. def handle_argv(self, prog_name, argv):
  232. self.prog_name = prog_name
  233. try:
  234. command = argv[0]
  235. except IndexError:
  236. command, argv = "help", ["help"]
  237. return self.execute(command, argv)
  238. def main():
  239. try:
  240. celeryctl().execute_from_commandline()
  241. except KeyboardInterrupt:
  242. pass
  243. if __name__ == "__main__": # pragma: no cover
  244. main()