celery.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. from __future__ import with_statement
  4. if __name__ == "__main__" and globals().get("__package__") is None:
  5. __package__ = "celery.bin.celery"
  6. import anyjson
  7. import sys
  8. from importlib import import_module
  9. from optparse import OptionParser, make_option as Option
  10. from pprint import pformat
  11. from textwrap import wrap
  12. from celery import __version__
  13. from celery.app import app_or_default, current_app
  14. from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
  15. from celery.utils import term
  16. from celery.utils.imports import symbol_by_name
  17. from celery.utils.text import pluralize
  18. from celery.utils.timeutils import maybe_iso8601
  19. from .base import Command as BaseCommand
  20. HELP = """
  21. Type '%(prog_name)s <command> --help' for help using
  22. a specific command.
  23. Available commands:
  24. %(commands)s
  25. """
  26. commands = {}
  27. class Error(Exception):
  28. def __init__(self, reason, status=EX_FAILURE):
  29. self.reason = reason
  30. self.status = status
  31. super(Error, self).__init__(reason, status)
  32. def __str__(self):
  33. return self.reason
  34. def command(fun, name=None):
  35. commands[name or fun.__name__] = fun
  36. return fun
  37. class Command(object):
  38. help = ""
  39. args = ""
  40. version = __version__
  41. option_list = BaseCommand.preload_options + (
  42. Option("--quiet", "-q", action="store_true", dest="quiet",
  43. default=False),
  44. Option("--no-color", "-C", dest="no_color", action="store_true",
  45. help="Don't colorize output."),
  46. )
  47. def __init__(self, app=None, no_color=False):
  48. self.app = app_or_default(app)
  49. self.colored = term.colored(enabled=not no_color)
  50. def __call__(self, *args, **kwargs):
  51. try:
  52. ret = self.run(*args, **kwargs)
  53. except Error, exc:
  54. self.error(self.colored.red("Error: %s" % exc))
  55. return exc.status
  56. return ret if ret is not None else EX_OK
  57. def show_help(self, command):
  58. self.run_from_argv(self.prog_name, [command, "--help"])
  59. return EX_USAGE
  60. def error(self, s):
  61. self.out(s, fh=sys.stderr)
  62. def out(self, s, fh=sys.stdout):
  63. s = str(s)
  64. if not s.endswith("\n"):
  65. s += "\n"
  66. fh.write(s)
  67. def create_parser(self, prog_name, command):
  68. return OptionParser(prog=prog_name,
  69. usage=self.usage(command),
  70. version=self.version,
  71. option_list=self.get_options())
  72. def get_options(self):
  73. return self.option_list
  74. def run_from_argv(self, prog_name, argv):
  75. self.prog_name = prog_name
  76. self.command = argv[0]
  77. self.arglist = argv[1:]
  78. self.parser = self.create_parser(self.prog_name, self.command)
  79. options, args = self.parser.parse_args(self.arglist)
  80. self.colored = term.colored(enabled=not options.no_color)
  81. return self(*args, **options.__dict__)
  82. def run(self, *args, **kwargs):
  83. raise NotImplementedError()
  84. def usage(self, command):
  85. return "%%prog %s [options] %s" % (command, self.args)
  86. def prettify_list(self, n):
  87. c = self.colored
  88. if not n:
  89. return "- empty -"
  90. return "\n".join(str(c.reset(c.white("*"), " %s" % (item, )))
  91. for item in n)
  92. def prettify_dict_ok_error(self, n):
  93. c = self.colored
  94. if "ok" in n:
  95. return (c.green("OK"),
  96. indent(self.prettify(n["ok"])[1]))
  97. elif "error" in n:
  98. return (c.red("ERROR"),
  99. indent(self.prettify(n["error"])[1]))
  100. def prettify(self, n):
  101. OK = str(self.colored.green("OK"))
  102. if isinstance(n, list):
  103. return OK, self.prettify_list(n)
  104. if isinstance(n, dict):
  105. if "ok" in n or "error" in n:
  106. return self.prettify_dict_ok_error(n)
  107. if isinstance(n, basestring):
  108. return OK, unicode(n)
  109. return OK, pformat(n)
  110. class Delegate(Command):
  111. def __init__(self, *args, **kwargs):
  112. super(Delegate, self).__init__(*args, **kwargs)
  113. self.target = symbol_by_name(self.Command)(app=self.app)
  114. self.args = self.target.args
  115. def get_options(self):
  116. return self.option_list + self.target.get_options()
  117. def run(self, *args, **kwargs):
  118. self.target.check_args(args)
  119. return self.target.run(*args, **kwargs)
  120. def create_delegate(name, Command):
  121. return command(type(name, (Delegate, ), {"Command": Command,
  122. "__module__": __name__}))
  123. worker = create_delegate("worker", "celery.bin.celeryd:WorkerCommand")
  124. events = create_delegate("events", "celery.bin.celeryev:EvCommand")
  125. beat = create_delegate("beat", "celery.bin.celerybeat:BeatCommand")
  126. amqp = create_delegate("amqp", "celery.bin.camqadm:AMQPAdminCommand")
  127. class list_(Command):
  128. args = "[bindings]"
  129. def list_bindings(self, management):
  130. try:
  131. bindings = management.get_bindings()
  132. except NotImplementedError:
  133. raise Error("Your transport cannot list bindings.")
  134. fmt = lambda q, e, r: self.out("%s %s %s" % (q.ljust(28),
  135. e.ljust(28), r))
  136. fmt("Queue", "Exchange", "Routing Key")
  137. fmt("-" * 16, "-" * 16, "-" * 16)
  138. for b in bindings:
  139. fmt(b["destination"], b["source"], b["routing_key"])
  140. def run(self, what=None, *_, **kw):
  141. topics = {"bindings": self.list_bindings}
  142. available = ', '.join(topics.keys())
  143. if not what:
  144. raise Error("You must specify what to list (%s)" % available)
  145. if what not in topics:
  146. raise Error("unknown topic %r (choose one of: %s)" % (
  147. what, available))
  148. with self.app.broker_connection() as conn:
  149. self.app.amqp.get_task_consumer(conn).declare()
  150. topics[what](conn.manager)
  151. list_ = command(list_, "list")
  152. class apply(Command):
  153. args = "<task_name>"
  154. option_list = Command.option_list + (
  155. Option("--args", "-a", dest="args"),
  156. Option("--kwargs", "-k", dest="kwargs"),
  157. Option("--eta", dest="eta"),
  158. Option("--countdown", dest="countdown", type="int"),
  159. Option("--expires", dest="expires"),
  160. Option("--serializer", dest="serializer", default="json"),
  161. Option("--queue", dest="queue"),
  162. Option("--exchange", dest="exchange"),
  163. Option("--routing-key", dest="routing_key"),
  164. )
  165. def run(self, name, *_, **kw):
  166. # Positional args.
  167. args = kw.get("args") or ()
  168. if isinstance(args, basestring):
  169. args = anyjson.loads(args)
  170. # Keyword args.
  171. kwargs = kw.get("kwargs") or {}
  172. if isinstance(kwargs, basestring):
  173. kwargs = anyjson.loads(kwargs)
  174. # Expires can be int/float.
  175. expires = kw.get("expires") or None
  176. try:
  177. expires = float(expires)
  178. except (TypeError, ValueError):
  179. # or a string describing an ISO 8601 datetime.
  180. try:
  181. expires = maybe_iso8601(expires)
  182. except (TypeError, ValueError):
  183. pass
  184. res = self.app.send_task(name, args=args, kwargs=kwargs,
  185. countdown=kw.get("countdown"),
  186. serializer=kw.get("serializer"),
  187. queue=kw.get("queue"),
  188. exchange=kw.get("exchange"),
  189. routing_key=kw.get("routing_key"),
  190. eta=maybe_iso8601(kw.get("eta")),
  191. expires=expires)
  192. self.out(res.id)
  193. apply = command(apply)
  194. class purge(Command):
  195. def run(self, *args, **kwargs):
  196. queues = len(current_app.amqp.queues.keys())
  197. messages_removed = current_app.control.discard_all()
  198. if messages_removed:
  199. self.out("Purged %s %s from %s known task %s." % (
  200. messages_removed, pluralize(messages_removed, "message"),
  201. queues, pluralize(queues, "queue")))
  202. else:
  203. self.out("No messages purged from %s known %s" % (
  204. queues, pluralize(queues, "queue")))
  205. purge = command(purge)
  206. class result(Command):
  207. args = "<task_id>"
  208. option_list = Command.option_list + (
  209. Option("--task", "-t", dest="task"),
  210. )
  211. def run(self, task_id, *args, **kwargs):
  212. result_cls = self.app.AsyncResult
  213. task = kwargs.get("task")
  214. if task:
  215. result_cls = self.app.tasks[task].AsyncResult
  216. result = result_cls(task_id)
  217. self.out(self.prettify(result.get())[1])
  218. result = command(result)
  219. class inspect(Command):
  220. choices = {"active": 1.0,
  221. "active_queues": 1.0,
  222. "scheduled": 1.0,
  223. "reserved": 1.0,
  224. "stats": 1.0,
  225. "revoked": 1.0,
  226. "registered_tasks": 1.0, # alias to registered
  227. "registered": 1.0,
  228. "enable_events": 1.0,
  229. "disable_events": 1.0,
  230. "ping": 0.2,
  231. "add_consumer": 1.0,
  232. "cancel_consumer": 1.0,
  233. "report": 1.0}
  234. option_list = Command.option_list + (
  235. Option("--timeout", "-t", type="float", dest="timeout",
  236. default=None,
  237. help="Timeout in seconds (float) waiting for reply"),
  238. Option("--destination", "-d", dest="destination",
  239. help="Comma separated list of destination node names."))
  240. show_body = True
  241. def usage(self, command):
  242. return "%%prog %s [options] %s [%s]" % (
  243. command, self.args, "|".join(self.choices.keys()))
  244. def run(self, *args, **kwargs):
  245. self.quiet = kwargs.get("quiet", False)
  246. self.show_body = kwargs.get("show_body", True)
  247. if not args:
  248. raise Error("Missing inspect command. See --help")
  249. command = args[0]
  250. if command == "help":
  251. raise Error("Did you mean 'inspect --help'?")
  252. if command not in self.choices:
  253. raise Error("Unknown inspect command: %s" % command)
  254. destination = kwargs.get("destination")
  255. timeout = kwargs.get("timeout") or self.choices[command]
  256. if destination and isinstance(destination, basestring):
  257. destination = map(str.strip, destination.split(","))
  258. def on_reply(body):
  259. c = self.colored
  260. node = body.keys()[0]
  261. reply = body[node]
  262. status, preply = self.prettify(reply)
  263. self.say("->", c.cyan(node, ": ") + status, indent(preply))
  264. self.say("<-", command)
  265. i = self.app.control.inspect(destination=destination,
  266. timeout=timeout,
  267. callback=on_reply)
  268. replies = getattr(i, command)(*args[1:])
  269. if not replies:
  270. raise Error("No nodes replied within time constraint.",
  271. status=EX_UNAVAILABLE)
  272. return replies
  273. def say(self, direction, title, body=""):
  274. c = self.colored
  275. if direction == "<-" and self.quiet:
  276. return
  277. dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""
  278. self.out(c.reset(dirstr, title))
  279. if body and self.show_body:
  280. self.out(body)
  281. inspect = command(inspect)
  282. def indent(s, n=4):
  283. i = [" " * n + l for l in s.split("\n")]
  284. return "\n".join("\n".join(wrap(j)) for j in i)
  285. class status(Command):
  286. option_list = inspect.option_list
  287. def run(self, *args, **kwargs):
  288. replies = inspect(app=self.app,
  289. no_color=kwargs.get("no_color", False)) \
  290. .run("ping", **dict(kwargs, quiet=True, show_body=False))
  291. if not replies:
  292. raise Error("No nodes replied within time constraint",
  293. status=EX_UNAVAILABLE)
  294. nodecount = len(replies)
  295. if not kwargs.get("quiet", False):
  296. self.out("\n%s %s online." % (nodecount,
  297. pluralize(nodecount, "node")))
  298. status = command(status)
  299. class migrate(Command):
  300. def usage(self, command):
  301. return "%%prog %s <source_url> <dest_url>" % (command, )
  302. def on_migrate_task(self, state, body, message):
  303. self.out("Migrating task %s/%s: %s[%s]" % (
  304. state.count, state.strtotal, body["task"], body["id"]))
  305. def run(self, *args, **kwargs):
  306. if len(args) != 2:
  307. return self.show_help("migrate")
  308. from kombu import BrokerConnection
  309. from celery.contrib.migrate import migrate_tasks
  310. migrate_tasks(BrokerConnection(args[0]),
  311. BrokerConnection(args[1]),
  312. callback=self.on_migrate_task)
  313. migrate = command(migrate)
  314. class shell(Command):
  315. option_list = Command.option_list + (
  316. Option("--ipython", "-I", action="store_true",
  317. dest="force_ipython", default=False,
  318. help="Force IPython."),
  319. Option("--bpython", "-B", action="store_true",
  320. dest="force_bpython", default=False,
  321. help="Force bpython."),
  322. Option("--python", "-P", action="store_true",
  323. dest="force_python", default=False,
  324. help="Force default Python shell."),
  325. Option("--without-tasks", "-T", action="store_true",
  326. dest="without_tasks", default=False,
  327. help="Don't add tasks to locals."),
  328. Option("--eventlet", action="store_true",
  329. dest="eventlet", default=False,
  330. help="Use eventlet."),
  331. Option("--gevent", action="store_true",
  332. dest="gevent", default=False,
  333. help="Use gevent."),
  334. )
  335. def run(self, force_ipython=False, force_bpython=False,
  336. force_python=False, without_tasks=False, eventlet=False,
  337. gevent=False, **kwargs):
  338. if eventlet:
  339. import_module("celery.concurrency.eventlet")
  340. if gevent:
  341. import_module("celery.concurrency.gevent")
  342. import celery
  343. import celery.task.base
  344. self.app.loader.import_default_modules()
  345. self.locals = {"celery": self.app,
  346. "BaseTask": celery.task.base.BaseTask,
  347. "chord": celery.chord,
  348. "group": celery.group,
  349. "chain": celery.chain,
  350. "subtask": celery.subtask}
  351. if not without_tasks:
  352. self.locals.update(dict((task.__name__, task)
  353. for task in self.app.tasks.itervalues()
  354. if not task.name.startswith("celery.")))
  355. if force_python:
  356. return self.invoke_fallback_shell()
  357. elif force_bpython:
  358. return self.invoke_bpython_shell()
  359. elif force_ipython:
  360. return self.invoke_ipython_shell()
  361. return self.invoke_default_shell()
  362. def invoke_default_shell(self):
  363. try:
  364. import IPython # noqa
  365. except ImportError:
  366. try:
  367. import bpython # noqa
  368. except ImportError:
  369. return self.invoke_fallback_shell()
  370. else:
  371. return self.invoke_bpython_shell()
  372. else:
  373. return self.invoke_ipython_shell()
  374. def invoke_fallback_shell(self):
  375. import code
  376. try:
  377. import readline
  378. except ImportError:
  379. pass
  380. else:
  381. import rlcompleter
  382. readline.set_completer(
  383. rlcompleter.Completer(self.locals).complete)
  384. readline.parse_and_bind("tab:complete")
  385. code.interact(local=self.locals)
  386. def invoke_ipython_shell(self):
  387. try:
  388. from IPython.frontend.terminal import embed
  389. embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
  390. except ImportError: # ipython < 0.11
  391. from IPython.Shell import IPShell
  392. IPShell(argv=[], user_ns=self.locals).mainloop()
  393. def invoke_bpython_shell(self):
  394. import bpython
  395. bpython.embed(self.locals)
  396. shell = command(shell)
  397. class help(Command):
  398. def usage(self, command):
  399. return "%%prog <command> [options] %s" % (self.args, )
  400. def run(self, *args, **kwargs):
  401. self.parser.print_help()
  402. self.out(HELP % {"prog_name": self.prog_name,
  403. "commands": "\n".join(indent(command)
  404. for command in sorted(commands))})
  405. return EX_USAGE
  406. help = command(help)
  407. class report(Command):
  408. def run(self, *args, **kwargs):
  409. print(self.app.bugreport())
  410. return EX_OK
  411. report = command(report)
  412. class CeleryCommand(BaseCommand):
  413. commands = commands
  414. enable_config_from_cmdline = True
  415. def execute(self, command, argv=None):
  416. try:
  417. cls = self.commands[command]
  418. except KeyError:
  419. cls, argv = self.commands["help"], ["help"]
  420. cls = self.commands.get(command) or self.commands["help"]
  421. try:
  422. return cls(app=self.app).run_from_argv(self.prog_name, argv)
  423. except Error:
  424. return self.execute("help", argv)
  425. def remove_options_at_beginning(self, argv, index=0):
  426. if argv:
  427. while index < len(argv):
  428. value = argv[index]
  429. if value.startswith("--"):
  430. pass
  431. elif value.startswith("-"):
  432. index += 1
  433. else:
  434. return argv[index:]
  435. index += 1
  436. return []
  437. def handle_argv(self, prog_name, argv):
  438. self.prog_name = prog_name
  439. argv = self.remove_options_at_beginning(argv)
  440. try:
  441. command = argv[0]
  442. except IndexError:
  443. command, argv = "help", ["help"]
  444. return self.execute(command, argv)
  445. def execute_from_commandline(self, argv=None):
  446. try:
  447. sys.exit(determine_exit_status(
  448. super(CeleryCommand, self).execute_from_commandline(argv)))
  449. except KeyboardInterrupt:
  450. sys.exit(EX_FAILURE)
  451. def determine_exit_status(ret):
  452. if isinstance(ret, int):
  453. return ret
  454. return EX_OK if ret else EX_FAILURE
  455. def main():
  456. CeleryCommand().execute_from_commandline()
  457. if __name__ == "__main__": # pragma: no cover
  458. main()