| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565 | # -*- coding: utf-8 -*-from __future__ import absolute_importfrom __future__ import with_statementif __name__ == "__main__" and globals().get("__package__") is None:    __package__ = "celery.bin.celery"import anyjsonimport sysfrom importlib import import_modulefrom optparse import OptionParser, make_option as Optionfrom pprint import pformatfrom textwrap import wrapfrom celery import __version__from celery.app import app_or_default, current_appfrom celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGEfrom celery.utils import termfrom celery.utils.imports import symbol_by_namefrom celery.utils.text import pluralizefrom celery.utils.timeutils import maybe_iso8601from .base import Command as BaseCommandHELP = """Type '%(prog_name)s <command> --help' for help usinga specific command.Available commands:%(commands)s"""commands = {}class Error(Exception):    def __init__(self, reason, status=EX_FAILURE):        self.reason = reason        self.status = status        super(Error, self).__init__(reason, status)    def __str__(self):        return self.reasondef command(fun, name=None):    commands[name or fun.__name__] = fun    return funclass Command(object):    help = ""    args = ""    version = __version__    option_list = BaseCommand.preload_options + (        Option("--quiet", "-q", action="store_true", dest="quiet",                default=False),        Option("--no-color", "-C", dest="no_color", action="store_true",            help="Don't colorize output."),    )    def __init__(self, app=None, no_color=False):        self.app = app_or_default(app)        self.colored = term.colored(enabled=not no_color)    def __call__(self, *args, **kwargs):        try:            ret = self.run(*args, **kwargs)        except Error, exc:            self.error(self.colored.red("Error: %s" % exc))            return exc.status        return ret if ret is not None else EX_OK    def show_help(self, command):        self.run_from_argv(self.prog_name, [command, "--help"])        return EX_USAGE    def error(self, s):        self.out(s, fh=sys.stderr)    def out(self, s, fh=sys.stdout):        s = str(s)        if not s.endswith("\n"):            s += "\n"        fh.write(s)    def create_parser(self, prog_name, command):        return OptionParser(prog=prog_name,                            usage=self.usage(command),                            version=self.version,                            option_list=self.get_options())    def get_options(self):        return self.option_list    def run_from_argv(self, prog_name, argv):        self.prog_name = prog_name        self.command = argv[0]        self.arglist = argv[1:]        self.parser = self.create_parser(self.prog_name, self.command)        options, args = self.parser.parse_args(self.arglist)        self.colored = term.colored(enabled=not options.no_color)        return self(*args, **options.__dict__)    def run(self, *args, **kwargs):        raise NotImplementedError()    def usage(self, command):        return "%%prog %s [options] %s" % (command, self.args)    def prettify_list(self, n):        c = self.colored        if not n:            return "- empty -"        return "\n".join(str(c.reset(c.white("*"), " %s" % (item, )))                            for item in n)    def prettify_dict_ok_error(self, n):        c = self.colored        if "ok" in n:            return (c.green("OK"),                    indent(self.prettify(n["ok"])[1]))        elif "error" in n:            return (c.red("ERROR"),                    indent(self.prettify(n["error"])[1]))    def prettify(self, n):        OK = str(self.colored.green("OK"))        if isinstance(n, list):            return OK, self.prettify_list(n)        if isinstance(n, dict):            if "ok" in n or "error" in n:                return self.prettify_dict_ok_error(n)        if isinstance(n, basestring):            return OK, unicode(n)        return OK, pformat(n)class Delegate(Command):    def __init__(self, *args, **kwargs):        super(Delegate, self).__init__(*args, **kwargs)        self.target = symbol_by_name(self.Command)(app=self.app)        self.args = self.target.args    def get_options(self):        return self.option_list + self.target.get_options()    def run(self, *args, **kwargs):        self.target.check_args(args)        return self.target.run(*args, **kwargs)def create_delegate(name, Command):    return command(type(name, (Delegate, ), {"Command": Command,                                             "__module__": __name__}))worker = create_delegate("worker", "celery.bin.celeryd:WorkerCommand")events = create_delegate("events", "celery.bin.celeryev:EvCommand")beat = create_delegate("beat", "celery.bin.celerybeat:BeatCommand")amqp = create_delegate("amqp", "celery.bin.camqadm:AMQPAdminCommand")class list_(Command):    args = "[bindings]"    def list_bindings(self, management):        try:            bindings = management.get_bindings()        except NotImplementedError:            raise Error("Your transport cannot list bindings.")        fmt = lambda q, e, r: self.out("%s %s %s" % (q.ljust(28),                                                     e.ljust(28), r))        fmt("Queue", "Exchange", "Routing Key")        fmt("-" * 16, "-" * 16, "-" * 16)        for b in bindings:            fmt(b["destination"], b["source"], b["routing_key"])    def run(self, what=None, *_, **kw):        topics = {"bindings": self.list_bindings}        available = ', '.join(topics.keys())        if not what:            raise Error("You must specify what to list (%s)" % available)        if what not in topics:            raise Error("unknown topic %r (choose one of: %s)" % (                            what, available))        with self.app.broker_connection() as conn:            self.app.amqp.get_task_consumer(conn).declare()            topics[what](conn.manager)list_ = command(list_, "list")class apply(Command):    args = "<task_name>"    option_list = Command.option_list + (            Option("--args", "-a", dest="args"),            Option("--kwargs", "-k", dest="kwargs"),            Option("--eta", dest="eta"),            Option("--countdown", dest="countdown", type="int"),            Option("--expires", dest="expires"),            Option("--serializer", dest="serializer", default="json"),            Option("--queue", dest="queue"),            Option("--exchange", dest="exchange"),            Option("--routing-key", dest="routing_key"),    )    def run(self, name, *_, **kw):        # Positional args.        args = kw.get("args") or ()        if isinstance(args, basestring):            args = anyjson.loads(args)        # Keyword args.        kwargs = kw.get("kwargs") or {}        if isinstance(kwargs, basestring):            kwargs = anyjson.loads(kwargs)        # Expires can be int/float.        expires = kw.get("expires") or None        try:            expires = float(expires)        except (TypeError, ValueError):            # or a string describing an ISO 8601 datetime.            try:                expires = maybe_iso8601(expires)            except (TypeError, ValueError):                pass        res = self.app.send_task(name, args=args, kwargs=kwargs,                                 countdown=kw.get("countdown"),                                 serializer=kw.get("serializer"),                                 queue=kw.get("queue"),                                 exchange=kw.get("exchange"),                                 routing_key=kw.get("routing_key"),                                 eta=maybe_iso8601(kw.get("eta")),                                 expires=expires)        self.out(res.id)apply = command(apply)class purge(Command):    def run(self, *args, **kwargs):        queues = len(current_app.amqp.queues.keys())        messages_removed = current_app.control.discard_all()        if messages_removed:            self.out("Purged %s %s from %s known task %s." % (                messages_removed, pluralize(messages_removed, "message"),                queues, pluralize(queues, "queue")))        else:            self.out("No messages purged from %s known %s" % (                queues, pluralize(queues, "queue")))purge = command(purge)class result(Command):    args = "<task_id>"    option_list = Command.option_list + (            Option("--task", "-t", dest="task"),    )    def run(self, task_id, *args, **kwargs):        result_cls = self.app.AsyncResult        task = kwargs.get("task")        if task:            result_cls = self.app.tasks[task].AsyncResult        result = result_cls(task_id)        self.out(self.prettify(result.get())[1])result = command(result)class inspect(Command):    choices = {"active": 1.0,               "active_queues": 1.0,               "scheduled": 1.0,               "reserved": 1.0,               "stats": 1.0,               "revoked": 1.0,               "registered_tasks": 1.0,  # alias to registered               "registered": 1.0,               "enable_events": 1.0,               "disable_events": 1.0,               "ping": 0.2,               "add_consumer": 1.0,               "cancel_consumer": 1.0,               "report": 1.0}    option_list = Command.option_list + (                Option("--timeout", "-t", type="float", dest="timeout",                    default=None,                    help="Timeout in seconds (float) waiting for reply"),                Option("--destination", "-d", dest="destination",                    help="Comma separated list of destination node names."))    show_body = True    def usage(self, command):        return "%%prog %s [options] %s [%s]" % (                command, self.args, "|".join(self.choices.keys()))    def run(self, *args, **kwargs):        self.quiet = kwargs.get("quiet", False)        self.show_body = kwargs.get("show_body", True)        if not args:            raise Error("Missing inspect command. See --help")        command = args[0]        if command == "help":            raise Error("Did you mean 'inspect --help'?")        if command not in self.choices:            raise Error("Unknown inspect command: %s" % command)        destination = kwargs.get("destination")        timeout = kwargs.get("timeout") or self.choices[command]        if destination and isinstance(destination, basestring):            destination = map(str.strip, destination.split(","))        def on_reply(body):            c = self.colored            node = body.keys()[0]            reply = body[node]            status, preply = self.prettify(reply)            self.say("->", c.cyan(node, ": ") + status, indent(preply))        self.say("<-", command)        i = self.app.control.inspect(destination=destination,                                     timeout=timeout,                                     callback=on_reply)        replies = getattr(i, command)(*args[1:])        if not replies:            raise Error("No nodes replied within time constraint.",                        status=EX_UNAVAILABLE)        return replies    def say(self, direction, title, body=""):        c = self.colored        if direction == "<-" and self.quiet:            return        dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""        self.out(c.reset(dirstr, title))        if body and self.show_body:            self.out(body)inspect = command(inspect)def indent(s, n=4):    i = [" " * n + l for l in s.split("\n")]    return "\n".join("\n".join(wrap(j)) for j in i)class status(Command):    option_list = inspect.option_list    def run(self, *args, **kwargs):        replies = inspect(app=self.app,                          no_color=kwargs.get("no_color", False)) \                    .run("ping", **dict(kwargs, quiet=True, show_body=False))        if not replies:            raise Error("No nodes replied within time constraint",                        status=EX_UNAVAILABLE)        nodecount = len(replies)        if not kwargs.get("quiet", False):            self.out("\n%s %s online." % (nodecount,                                          pluralize(nodecount, "node")))status = command(status)class migrate(Command):    def usage(self, command):        return "%%prog %s <source_url> <dest_url>" % (command, )    def on_migrate_task(self, state, body, message):        self.out("Migrating task %s/%s: %s[%s]" % (            state.count, state.strtotal, body["task"], body["id"]))    def run(self, *args, **kwargs):        if len(args) != 2:            return self.show_help("migrate")        from kombu import BrokerConnection        from celery.contrib.migrate import migrate_tasks        migrate_tasks(BrokerConnection(args[0]),                      BrokerConnection(args[1]),                      callback=self.on_migrate_task)migrate = command(migrate)class shell(Command):    option_list = Command.option_list + (                Option("--ipython", "-I", action="store_true",                    dest="force_ipython", default=False,                    help="Force IPython."),                Option("--bpython", "-B", action="store_true",                    dest="force_bpython", default=False,                    help="Force bpython."),                Option("--python", "-P", action="store_true",                    dest="force_python", default=False,                    help="Force default Python shell."),                Option("--without-tasks", "-T", action="store_true",                    dest="without_tasks", default=False,                    help="Don't add tasks to locals."),                Option("--eventlet", action="store_true",                    dest="eventlet", default=False,                    help="Use eventlet."),                Option("--gevent", action="store_true",                    dest="gevent", default=False,                    help="Use gevent."),    )    def run(self, force_ipython=False, force_bpython=False,            force_python=False, without_tasks=False, eventlet=False,            gevent=False, **kwargs):        if eventlet:            import_module("celery.concurrency.eventlet")        if gevent:            import_module("celery.concurrency.gevent")        import celery        import celery.task.base        self.app.loader.import_default_modules()        self.locals = {"celery": self.app,                       "BaseTask": celery.task.base.BaseTask,                       "chord": celery.chord,                       "group": celery.group,                       "chain": celery.chain,                       "subtask": celery.subtask}        if not without_tasks:            self.locals.update(dict((task.__name__, task)                                for task in self.app.tasks.itervalues()                                    if not task.name.startswith("celery.")))        if force_python:            return self.invoke_fallback_shell()        elif force_bpython:            return self.invoke_bpython_shell()        elif force_ipython:            return self.invoke_ipython_shell()        return self.invoke_default_shell()    def invoke_default_shell(self):        try:            import IPython  # noqa        except ImportError:            try:                import bpython  # noqa            except ImportError:                return self.invoke_fallback_shell()            else:                return self.invoke_bpython_shell()        else:            return self.invoke_ipython_shell()    def invoke_fallback_shell(self):        import code        try:            import readline        except ImportError:            pass        else:            import rlcompleter            readline.set_completer(                    rlcompleter.Completer(self.locals).complete)            readline.parse_and_bind("tab:complete")        code.interact(local=self.locals)    def invoke_ipython_shell(self):        try:            from IPython.frontend.terminal import embed            embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()        except ImportError:  # ipython < 0.11            from IPython.Shell import IPShell            IPShell(argv=[], user_ns=self.locals).mainloop()    def invoke_bpython_shell(self):        import bpython        bpython.embed(self.locals)shell = command(shell)class help(Command):    def usage(self, command):        return "%%prog <command> [options] %s" % (self.args, )    def run(self, *args, **kwargs):        self.parser.print_help()        self.out(HELP % {"prog_name": self.prog_name,                         "commands": "\n".join(indent(command)                                             for command in sorted(commands))})        return EX_USAGEhelp = command(help)class report(Command):    def run(self, *args, **kwargs):        print(self.app.bugreport())        return EX_OKreport = command(report)class CeleryCommand(BaseCommand):    commands = commands    enable_config_from_cmdline = True    def execute(self, command, argv=None):        try:            cls = self.commands[command]        except KeyError:            cls, argv = self.commands["help"], ["help"]        cls = self.commands.get(command) or self.commands["help"]        try:            return cls(app=self.app).run_from_argv(self.prog_name, argv)        except Error:            return self.execute("help", argv)    def remove_options_at_beginning(self, argv, index=0):        if argv:            while index < len(argv):                value = argv[index]                if value.startswith("--"):                    pass                elif value.startswith("-"):                    index += 1                else:                    return argv[index:]                index += 1        return []    def handle_argv(self, prog_name, argv):        self.prog_name = prog_name        argv = self.remove_options_at_beginning(argv)        try:            command = argv[0]        except IndexError:            command, argv = "help", ["help"]        return self.execute(command, argv)    def execute_from_commandline(self, argv=None):        try:            sys.exit(determine_exit_status(                super(CeleryCommand, self).execute_from_commandline(argv)))        except KeyboardInterrupt:            sys.exit(EX_FAILURE)def determine_exit_status(ret):    if isinstance(ret, int):        return ret    return EX_OK if ret else EX_FAILUREdef main():    CeleryCommand().execute_from_commandline()if __name__ == "__main__":          # pragma: no cover    main()
 |