celeryctl.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import with_statement
  4. if __name__ == "__main__" and __package__ is None:
  5. __package__ = "celery.bin.celeryctl"
  6. import sys
  7. from importlib import import_module
  8. from optparse import OptionParser, make_option as Option
  9. from pprint import pformat
  10. from textwrap import wrap
  11. from anyjson import deserialize
  12. from .. import __version__
  13. from ..app import app_or_default, current_app
  14. from ..platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  15. from ..utils import pluralize, term
  16. from ..utils.timeutils import maybe_iso8601
  17. from ..bin.base import Command as CeleryCommand
  18. HELP = """
  19. Type '%(prog_name)s <command> --help' for help using
  20. a specific command.
  21. Available commands:
  22. %(commands)s
  23. """
  24. commands = {}
  25. class Error(Exception):
  26. def __init__(self, reason, status=EX_FAILURE):
  27. self.reason = reason
  28. self.status = status
  29. super(Error, self).__init__(reason, status)
  30. def __str__(self):
  31. return self.reason
  32. def command(fun, name=None):
  33. commands[name or fun.__name__] = fun
  34. return fun
  35. class Command(object):
  36. help = ""
  37. args = ""
  38. version = __version__
  39. option_list = CeleryCommand.preload_options + (
  40. Option("--quiet", "-q", action="store_true", dest="quiet",
  41. default=False),
  42. Option("--no-color", "-C", dest="no_color", action="store_true",
  43. help="Don't colorize output."),
  44. )
  45. def __init__(self, app=None, no_color=False):
  46. self.app = app_or_default(app)
  47. self.colored = term.colored(enabled=not no_color)
  48. def __call__(self, *args, **kwargs):
  49. try:
  50. ret = self.run(*args, **kwargs)
  51. except Error, exc:
  52. self.error(self.colored.red("Error: %s" % exc))
  53. return exc.status
  54. return ret if ret is not None else EX_OK
  55. def show_help(self, command):
  56. self.run_from_argv(self.prog_name, [command, "--help"])
  57. return EX_USAGE
  58. def error(self, s):
  59. self.out(s, fh=sys.stderr)
  60. def out(self, s, fh=sys.stdout):
  61. s = str(s)
  62. if not s.endswith("\n"):
  63. s += "\n"
  64. fh.write(s)
  65. def create_parser(self, prog_name, command):
  66. return OptionParser(prog=prog_name,
  67. usage=self.usage(command),
  68. version=self.version,
  69. option_list=self.option_list)
  70. def run_from_argv(self, prog_name, argv):
  71. self.prog_name = prog_name
  72. self.command = argv[0]
  73. self.arglist = argv[1:]
  74. self.parser = self.create_parser(self.prog_name, self.command)
  75. options, args = self.parser.parse_args(self.arglist)
  76. self.colored = term.colored(enabled=not options.no_color)
  77. return self(*args, **options.__dict__)
  78. def run(self, *args, **kwargs):
  79. raise NotImplementedError()
  80. def usage(self, command):
  81. return "%%prog %s [options] %s" % (command, self.args)
  82. def prettify_list(self, n):
  83. c = self.colored
  84. if not n:
  85. return "- empty -"
  86. return "\n".join(str(c.reset(c.white("*"), " %s" % (item, )))
  87. for item in n)
  88. def prettify_dict_ok_error(self, n):
  89. c = self.colored
  90. if "ok" in n:
  91. return (c.green("OK"),
  92. indent(self.prettify(n["ok"])[1]))
  93. elif "error" in n:
  94. return (c.red("ERROR"),
  95. indent(self.prettify(n["error"])[1]))
  96. def prettify(self, n):
  97. OK = str(self.colored.green("OK"))
  98. if isinstance(n, list):
  99. return OK, self.prettify_list(n)
  100. if isinstance(n, dict):
  101. if "ok" in n or "error" in n:
  102. return self.prettify_dict_ok_error(n)
  103. if isinstance(n, basestring):
  104. return OK, unicode(n)
  105. return OK, pformat(n)
  106. class list_(Command):
  107. args = "<bindings>"
  108. def list_bindings(self, channel):
  109. fmt = lambda q, e, r: self.out("%s %s %s" % (q.ljust(28),
  110. e.ljust(28), r))
  111. fmt("Queue", "Exchange", "Routing Key")
  112. fmt("-" * 16, "-" * 16, "-" * 16)
  113. for binding in channel.list_bindings():
  114. fmt(*binding)
  115. def run(self, what, *_, **kw):
  116. topics = {"bindings": self.list_bindings}
  117. if what not in topics:
  118. raise ValueError("%r not in %r" % (what, topics.keys()))
  119. with self.app.broker_connection() as conn:
  120. self.app.amqp.get_task_consumer(conn).declare()
  121. with conn.channel() as channel:
  122. return topics[what](channel)
  123. list_ = command(list_, "list")
  124. class apply(Command):
  125. args = "<task_name>"
  126. option_list = Command.option_list + (
  127. Option("--args", "-a", dest="args"),
  128. Option("--kwargs", "-k", dest="kwargs"),
  129. Option("--eta", dest="eta"),
  130. Option("--countdown", dest="countdown", type="int"),
  131. Option("--expires", dest="expires"),
  132. Option("--serializer", dest="serializer", default="json"),
  133. Option("--queue", dest="queue"),
  134. Option("--exchange", dest="exchange"),
  135. Option("--routing-key", dest="routing_key"),
  136. )
  137. def run(self, name, *_, **kw):
  138. # Positional args.
  139. args = kw.get("args") or ()
  140. if isinstance(args, basestring):
  141. args = deserialize(args)
  142. # Keyword args.
  143. kwargs = kw.get("kwargs") or {}
  144. if isinstance(kwargs, basestring):
  145. kwargs = deserialize(kwargs)
  146. # Expires can be int/float.
  147. expires = kw.get("expires") or None
  148. try:
  149. expires = float(expires)
  150. except (TypeError, ValueError):
  151. # or a string describing an ISO 8601 datetime.
  152. try:
  153. expires = maybe_iso8601(expires)
  154. except (TypeError, ValueError):
  155. pass
  156. res = self.app.send_task(name, args=args, kwargs=kwargs,
  157. countdown=kw.get("countdown"),
  158. serializer=kw.get("serializer"),
  159. queue=kw.get("queue"),
  160. exchange=kw.get("exchange"),
  161. routing_key=kw.get("routing_key"),
  162. eta=maybe_iso8601(kw.get("eta")),
  163. expires=expires)
  164. self.out(res.task_id)
  165. apply = command(apply)
  166. class purge(Command):
  167. def run(self, *args, **kwargs):
  168. app = current_app()
  169. queues = len(app.amqp.queues.keys())
  170. messages_removed = app.control.discard_all()
  171. if messages_removed:
  172. self.out("Purged %s %s from %s known task %s." % (
  173. messages_removed, pluralize(messages_removed, "message"),
  174. queues, pluralize(queues, "queue")))
  175. else:
  176. self.out("No messages purged from %s known %s" % (
  177. queues, pluralize(queues, "queue")))
  178. purge = command(purge)
  179. class result(Command):
  180. args = "<task_id>"
  181. option_list = Command.option_list + (
  182. Option("--task", "-t", dest="task"),
  183. )
  184. def run(self, task_id, *args, **kwargs):
  185. result_cls = self.app.AsyncResult
  186. task = kwargs.get("task")
  187. if task:
  188. result_cls = self.app.tasks[task].AsyncResult
  189. result = result_cls(task_id)
  190. self.out(self.prettify(result.get())[1])
  191. result = command(result)
  192. class inspect(Command):
  193. choices = {"active": 1.0,
  194. "active_queues": 1.0,
  195. "scheduled": 1.0,
  196. "reserved": 1.0,
  197. "stats": 1.0,
  198. "revoked": 1.0,
  199. "registered_tasks": 1.0, # alias to registered
  200. "registered": 1.0,
  201. "enable_events": 1.0,
  202. "disable_events": 1.0,
  203. "ping": 0.2,
  204. "add_consumer": 1.0,
  205. "cancel_consumer": 1.0}
  206. option_list = Command.option_list + (
  207. Option("--timeout", "-t", type="float", dest="timeout",
  208. default=None,
  209. help="Timeout in seconds (float) waiting for reply"),
  210. Option("--destination", "-d", dest="destination",
  211. help="Comma separated list of destination node names."))
  212. show_body = True
  213. def usage(self, command):
  214. return "%%prog %s [options] %s [%s]" % (
  215. command, self.args, "|".join(self.choices.keys()))
  216. def run(self, *args, **kwargs):
  217. self.quiet = kwargs.get("quiet", False)
  218. self.show_body = kwargs.get("show_body", True)
  219. if not args:
  220. raise Error("Missing inspect command. See --help")
  221. command = args[0]
  222. if command == "help":
  223. raise Error("Did you mean 'inspect --help'?")
  224. if command not in self.choices:
  225. raise Error("Unknown inspect command: %s" % command)
  226. destination = kwargs.get("destination")
  227. timeout = kwargs.get("timeout") or self.choices[command]
  228. if destination and isinstance(destination, basestring):
  229. destination = map(str.strip, destination.split(","))
  230. def on_reply(body):
  231. c = self.colored
  232. node = body.keys()[0]
  233. reply = body[node]
  234. status, preply = self.prettify(reply)
  235. self.say("->", c.cyan(node, ": ") + status, indent(preply))
  236. self.say("<-", command)
  237. i = self.app.control.inspect(destination=destination,
  238. timeout=timeout,
  239. callback=on_reply)
  240. replies = getattr(i, command)(*args[1:])
  241. if not replies:
  242. raise Error("No nodes replied within time constraint.",
  243. status=EX_UNAVAILABLE)
  244. return replies
  245. def say(self, direction, title, body=""):
  246. c = self.colored
  247. if direction == "<-" and self.quiet:
  248. return
  249. dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""
  250. self.out(c.reset(dirstr, title))
  251. if body and self.show_body:
  252. self.out(body)
  253. inspect = command(inspect)
  254. def indent(s, n=4):
  255. i = [" " * n + l for l in s.split("\n")]
  256. return "\n".join("\n".join(wrap(j)) for j in i)
  257. class status(Command):
  258. option_list = inspect.option_list
  259. def run(self, *args, **kwargs):
  260. replies = inspect(app=self.app,
  261. no_color=kwargs.get("no_color", False)) \
  262. .run("ping", **dict(kwargs, quiet=True, show_body=False))
  263. if not replies:
  264. raise Error("No nodes replied within time constraint",
  265. status=EX_UNAVAILABLE)
  266. nodecount = len(replies)
  267. if not kwargs.get("quiet", False):
  268. self.out("\n%s %s online." % (nodecount,
  269. pluralize(nodecount, "node")))
  270. status = command(status)
  271. class migrate(Command):
  272. def usage(self, command):
  273. return "%%prog %s <source_url> <dest_url>" % (command, )
  274. def on_migrate_task(self, state, body, message):
  275. self.out("Migrating task %s/%s: %s[%s]" % (
  276. state.count, state.strtotal, body["task"], body["id"]))
  277. def run(self, *args, **kwargs):
  278. if len(args) != 2:
  279. return self.show_help("migrate")
  280. from kombu import BrokerConnection
  281. from ..contrib.migrate import migrate_tasks
  282. migrate_tasks(BrokerConnection(args[0]),
  283. BrokerConnection(args[1]),
  284. callback=self.on_migrate_task)
  285. migrate = command(migrate)
  286. class shell(Command):
  287. option_list = Command.option_list + (
  288. Option("--ipython", "-I", action="store_true",
  289. dest="force_ipython", default=False,
  290. help="Force IPython."),
  291. Option("--bpython", "-B", action="store_true",
  292. dest="force_bpython", default=False,
  293. help="Force bpython."),
  294. Option("--python", "-P", action="store_true",
  295. dest="force_python", default=False,
  296. help="Force default Python shell."),
  297. Option("--without-tasks", "-T", action="store_true",
  298. dest="without_tasks", default=False,
  299. help="Don't add tasks to locals."),
  300. Option("--eventlet", action="store_true",
  301. dest="eventlet", default=False,
  302. help="Use eventlet."),
  303. Option("--gevent", action="store_true",
  304. dest="gevent", default=False,
  305. help="Use gevent."),
  306. )
  307. def run(self, force_ipython=False, force_bpython=False,
  308. force_python=False, without_tasks=False, eventlet=False,
  309. gevent=False, **kwargs):
  310. if eventlet:
  311. import_module("celery.concurrency.eventlet")
  312. if gevent:
  313. import_module("celery.concurrency.gevent")
  314. self.app.loader.import_default_modules()
  315. self.locals = {"celery": self.app}
  316. if not without_tasks:
  317. self.locals.update(dict((task.__name__, task)
  318. for task in self.app.tasks.itervalues()))
  319. if force_python:
  320. return self.invoke_fallback_shell()
  321. elif force_bpython:
  322. return self.invoke_bpython_shell()
  323. elif force_ipython:
  324. return self.invoke_ipython_shell()
  325. return self.invoke_default_shell()
  326. def invoke_default_shell(self):
  327. try:
  328. import IPython # noqa
  329. except ImportError:
  330. try:
  331. import bpython # noqa
  332. except ImportError:
  333. return self.invoke_fallback_shell()
  334. else:
  335. return self.invoke_bpython_shell()
  336. else:
  337. return self.invoke_ipython_shell()
  338. def invoke_fallback_shell(self):
  339. import code
  340. try:
  341. import readline
  342. except ImportError:
  343. pass
  344. else:
  345. import rlcompleter
  346. readline.set_completer(
  347. rlcompleter.Completer(self.locals).complete)
  348. readline.parse_and_bind("tab:complete")
  349. code.interact(local=self.locals)
  350. def invoke_ipython_shell(self):
  351. try:
  352. from IPython.frontend.terminal import embed
  353. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  354. except ImportError: # ipython < 0.11
  355. from IPython.Shell import IPShell
  356. IPShell(argv=[], user_ns=self.locals).mainloop()
  357. def invoke_bpython_shell(self):
  358. import bpython
  359. bpython.embed(self.locals)
  360. shell = command(shell)
  361. class help(Command):
  362. def usage(self, command):
  363. return "%%prog <command> [options] %s" % (self.args, )
  364. def run(self, *args, **kwargs):
  365. self.parser.print_help()
  366. self.out(HELP % {"prog_name": self.prog_name,
  367. "commands": "\n".join(indent(command)
  368. for command in sorted(commands))})
  369. return EX_USAGE
  370. help = command(help)
  371. class celeryctl(CeleryCommand):
  372. commands = commands
  373. enable_config_from_cmdline = True
  374. def execute(self, command, argv=None):
  375. try:
  376. cls = self.commands[command]
  377. except KeyError:
  378. cls, argv = self.commands["help"], ["help"]
  379. cls = self.commands.get(command) or self.commands["help"]
  380. try:
  381. return cls(app=self.app).run_from_argv(self.prog_name, argv)
  382. except Error:
  383. return self.execute("help", argv)
  384. def remove_options_at_beginning(self, argv, index=0):
  385. if argv:
  386. while index <= len(argv):
  387. value = argv[index]
  388. if value.startswith("--"):
  389. pass
  390. elif value.startswith("-"):
  391. index += 1
  392. else:
  393. return argv[index:]
  394. index += 1
  395. return []
  396. def handle_argv(self, prog_name, argv):
  397. self.prog_name = prog_name
  398. argv = self.remove_options_at_beginning(argv)
  399. try:
  400. command = argv[0]
  401. except IndexError:
  402. command, argv = "help", ["help"]
  403. return self.execute(command, argv)
  404. def determine_exit_status(ret):
  405. if isinstance(ret, int):
  406. return ret
  407. return EX_OK if ret else EX_FAILURE
  408. def main():
  409. try:
  410. sys.exit(determine_exit_status(
  411. celeryctl().execute_from_commandline()))
  412. except KeyboardInterrupt:
  413. sys.exit(EX_FAILURE)
  414. if __name__ == "__main__": # pragma: no cover
  415. main()