Преглед на файлове

More celeryd-multi tweaks

Ask Solem преди 14 години
родител
ревизия
a829d005a6
променени са 1 файла, в които са добавени 191 реда и са изтрити 156 реда
  1. 191 156
      celery/bin/celeryd_multi.py

+ 191 - 156
celery/bin/celeryd_multi.py

@@ -1,13 +1,5 @@
-import errno
-import os
-import shlex
-import signal
-import socket
-import sys
-
-from celery.utils.compat import defaultdict
+"""
 
-EXAMPLES = """
 Some examples:
 
     # The "start" command just prints the commands needed to start the workers.
@@ -70,7 +62,188 @@ Some examples:
     celeryd -n bar.myhost -c 10
     celeryd -n baz.myhost -c 10
     celeryd -n xuzzy.myhost -c 3
+
 """
+import errno
+import os
+import shlex
+import signal
+import socket
+import sys
+
+from subprocess import Popen
+
+from celery.utils.compat import defaultdict
+
+SIGNAMES = set(sig for sig in dir(signal)
+                        if sig.startswith("SIG") and "_" not in sig)
+
+
+def main():
+    sys.exit(MultiTool()(sys.argv[1:]))
+
+
+class MultiTool(object):
+    retcode = 0  # Final exit code.
+
+    def __init__(self):
+        self.commands = {"start": self.start,
+                         "stop": self.stop,
+                         "detach": self.detach,
+                         "names": self.names,
+                         "expand": self.expand,
+                         "get": self.get,
+                         "help": self.help}
+
+    def __call__(self, argv, cmd="celeryd"):
+        if len(argv) == 0 or argv[0][0] == "-":
+            return self.error()
+
+        # Reserve the --quite|-q/--verbose options.
+        self.quiet = False
+        self.verbose = False
+        if "--quiet" in argv:
+            self.quiet = argv.pop(argv.index("--quiet"))
+        if "-q" in argv:
+            self.quiet = argv.pop(argv.index("-q"))
+        if "--verbose" in argv:
+            self.verbose = argv.pop(argv.index("--verbose"))
+
+        try:
+            self.commands[argv[0]](argv[1:], cmd)
+        except KeyError:
+            self.error("Invalid command: %s" % argv[0])
+
+        return self.retcode
+
+    def names(self, argv, cmd):
+        p = NamespacedOptionParser(argv)
+        print("\n".join(hostname
+                        for hostname, _, _ in multi_args(p, cmd)))
+
+    def get(self, argv, cmd):
+        wanted = argv[0]
+        p = NamespacedOptionParser(argv[1:])
+        for name, worker, _ in multi_args(p, cmd):
+            if name == wanted:
+                print(" ".join(worker))
+                return
+
+    def start(self, argv, cmd):
+        p = NamespacedOptionParser(argv)
+        print("\n".join(" ".join(worker)
+                        for _, worker, _ in multi_args(p, cmd)))
+
+    def detach(self, argv, cmd):
+        p = NamespacedOptionParser(argv)
+        p.options.setdefault("--pidfile", "celeryd@%n.pid")
+        p.options.setdefault("--logfile", "celeryd@%n.log")
+        p.options.setdefault("--cmd", "-m celery.bin.celeryd_detach")
+        retcodes = []
+        for nodename, argv, _ in multi_args(p, cmd):
+            self.note("> Starting node %s..." % (nodename, ))
+            retcodes.append(self.waitexec(argv))
+        self.retcode = not any(retcodes)
+
+    def stop(self, argv, cmd):
+        from celery import platforms
+
+        p = NamespacedOptionParser(argv)
+        restargs = p.args[len(p.values):]
+        sig = findsig(restargs)
+        pidfile_template = p.options.setdefault("--pidfile", "celeryd@%n.pid")
+
+        for nodename, _, expander in multi_args(p, cmd):
+            pidfile = expander(pidfile_template)
+            try:
+                pid = platforms.read_pid_from_pidfile(pidfile)
+            except ValueError:
+                pass
+            if pid:
+                self.note("> Stopping node %s (%s)..." % (nodename, pid))
+                try:
+                    os.kill(pid, sig)
+                except OSError, exc:
+                    if exc.errno != errno.ESRCH:
+                        raise
+                    self.note("Could not stop pid %s: No such process." % pid)
+            else:
+                self.note("> Skipping node %s: not alive." % (nodename, ))
+
+    def expand(self, argv, cmd=None):
+        template = argv[0]
+        p = NamespacedOptionParser(argv[1:])
+        for _, _, expander in multi_args(p, cmd):
+            print(expander(template))
+
+    def help(self, argv, cmd=None):
+        say(__doc__)
+
+    def usage(self):
+        say("Please use one of the following commands: %s" % (
+            ", ".join(self.commands.keys())))
+
+    def waitexec(self, argv, path=sys.executable):
+        pipe = Popen(shlex.split(" ".join([path] + argv)))
+        self.info("  %s" % " ".join([path] + argv))
+        retcode = pipe.wait()
+        if retcode < 0:
+            self.note("* Child was terminated by signal %s" % (-retcode, ))
+        elif retcode > 0:
+            self.note("* Child terminated with failure code %s" % (retcode, ))
+        return retcode == 0
+
+    def error(self, msg=None):
+        if msg:
+            say(msg)
+        self.usage()
+        self.retcode = 1
+        return 1
+
+    def info(self, msg):
+        if self.verbose:
+            self.note(msg)
+
+    def note(self, msg):
+        if not self.quiet:
+            say(msg)
+
+
+def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
+    names = p.values
+    options = dict(p.options)
+    ranges = len(names) == 1
+    if ranges:
+        try:
+            noderange = int(names[0])
+        except ValueError:
+            pass
+        else:
+            names = map(str, range(1, int(names[0]) + 1))
+            prefix = "celery"
+    cmd = options.pop("--cmd", cmd)
+    append = options.pop("--append", append)
+    hostname = options.pop("--hostname",
+                   options.pop("-n", socket.gethostname()))
+    prefix = options.pop("--prefix", prefix) or ""
+    suffix = options.pop("--suffix", suffix) or "." + hostname
+
+    for ns_name, ns_opts in p.namespaces.items():
+        if "," in ns_name or (ranges and "-" in ns_name):
+            for subns in parse_ns_range(ns_name, ranges):
+                p.namespaces[subns].update(ns_opts)
+            p.namespaces.pop(ns_name)
+
+    for name in names:
+        this_name = options["-n"] = prefix + name + suffix
+        expand = abbreviations({"%h": this_name,
+                                "%n": name})
+        argv = ([expand(cmd)] +
+                [format_opt(opt, expand(value))
+                        for opt, value in p.optmerge(name, options).items()])
+        if append:
+            argv.append(expand(append))
+        yield this_name, argv, expand
 
 
 class NamespacedOptionParser(object):
@@ -159,160 +332,22 @@ def abbreviations(map):
     return expand
 
 
-def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
-    names = p.values
-    options = dict(p.options)
-    ranges = len(names) == 1
-    if ranges:
-        try:
-            noderange = int(names[0])
-        except ValueError:
-            pass
-        else:
-            names = map(str, range(1, int(names[0]) + 1))
-            prefix = "celery"
-    cmd = options.pop("--cmd", cmd)
-    append = options.pop("--append", append)
-    hostname = options.pop("--hostname",
-                   options.pop("-n", socket.gethostname()))
-    prefix = options.pop("--prefix", prefix) or ""
-    suffix = options.pop("--suffix", suffix) or "." + hostname
-
-    for ns_name, ns_opts in p.namespaces.items():
-        if "," in ns_name or (ranges and "-" in ns_name):
-            for subns in parse_ns_range(ns_name, ranges):
-                p.namespaces[subns].update(ns_opts)
-            p.namespaces.pop(ns_name)
-
-    for name in names:
-        this_name = options["-n"] = prefix + name + suffix
-        expand = abbreviations({"%h": this_name,
-                                "%n": name})
-        line = expand(cmd) + " " + " ".join(
-                format_opt(opt, expand(value))
-                    for opt, value in p.optmerge(name, options).items())
-        if append:
-            line += " %s" % expand(append)
-        yield this_name, line, expand
-
-
 def say(m):
     sys.stderr.write("%s\n" % (m, ))
 
 
-def detach_worker(argv, path=sys.executable):
-    if os.fork() == 0:
-        os.execv(path, [path] + argv)
-
-
-class MultiTool(object):
-
-    def __init__(self):
-        self.commands = {"start": self.start,
-                         "stop": self.stop,
-                         "detach": self.detach,
-                         "names": self.names,
-                         "expand": self.expand,
-                         "get": self.get,
-                         "help": self.help}
-
-    def __call__(self, argv, cmd="celeryd"):
-        if len(argv) == 0 or argv[0][0] == "-":
-            self.usage()
-            sys.exit(0)
-
-        try:
-            return self.commands[argv[0]](argv[1:], cmd)
-        except KeyError:
-            say("Invalid command: %s" % argv[0])
-            self.usage()
-            sys.exit(1)
-
-    def names(self, argv, cmd):
-        p = NamespacedOptionParser(argv)
-        print("\n".join(hostname
-                        for hostname, _, _ in multi_args(p, cmd)))
-
-    def get(self, argv, cmd):
-        wanted = argv[0]
-        p = NamespacedOptionParser(argv[1:])
-        for name, worker, _ in multi_args(p, cmd):
-            if name == wanted:
-                print(worker)
-                return
-
-    def start(self, argv, cmd):
-        p = NamespacedOptionParser(argv)
-        print("\n".join(worker
-                        for _, worker, _ in multi_args(p, cmd)))
-
-    def detach(self, argv, cmd):
-        p = NamespacedOptionParser(argv)
-        p.options.setdefault("--pidfile", "celeryd@%n.pid")
-        p.options.setdefault("--logfile", "celeryd@%n.log")
-        p.options.setdefault("--cmd", "-m celery.bin.celeryd_detach")
-        for nodename, argv, _ in multi_args(p, cmd):
-            print("> Starting node %s..." % (nodename, ))
-            print(argv)
-            detach_worker(shlex.split(argv))
-
-    def stop(self, argv, cmd):
-        from celery import platforms
-        signames = set(sig for sig in dir(signal)
-                            if sig.startswith("SIG") and "_" not in sig)
-
-        def findsig(args, default=signal.SIGTERM):
-            for arg in reversed(args):
-                if len(arg) == 2 and arg[0] == "-" and arg[1].isdigit():
-                    try:
-                        return int(arg[1])
-                    except ValueError:
-                        pass
-                if arg[0] == "-":
-                    maybe_sig = "SIG" + arg[1:]
-                    if maybe_sig in signames:
-                        return getattr(signal, maybe_sig)
-            return default
-
-        p = NamespacedOptionParser(argv)
-        restargs = p.args[len(p.values):]
-        sig = findsig(restargs)
-        pidfile_template = p.options.setdefault("--pidfile", "celeryd@%n.pid")
-
-        for nodename, _, expander in multi_args(p, cmd):
-            pidfile = expander(pidfile_template)
+def findsig(args, default=signal.SIGTERM):
+    for arg in reversed(args):
+        if len(arg) == 2 and arg[0] == "-" and arg[1].isdigit():
             try:
-                pid = platforms.read_pid_from_pidfile(pidfile)
+                return int(arg[1])
             except ValueError:
                 pass
-            if pid:
-                print("> Stopping node %s (%s)..." % (nodename, pid))
-                try:
-                    os.kill(pid, sig)
-                except OSError, exc:
-                    if exc.errno != errno.ESRCH:
-                        raise
-                    print("Could not stop pid %s: No such process." % pid)
-            else:
-                print("> Skipping node %s: not alive." % (nodename, ))
-
-    def expand(self, argv, cmd=None):
-        template = argv[0]
-        p = NamespacedOptionParser(argv[1:])
-        for _, _, expander in multi_args(p, cmd):
-            print(expander(template))
-
-    def help(self, argv, cmd=None):
-        say(EXAMPLES)
-
-    def usage(self):
-        say("Please use one of the following commands: %s" % (
-            ", ".join(self.commands.keys())))
-
-
-def main():
-    MultiTool()(sys.argv[1:])
-
+        if arg[0] == "-":
+            maybe_sig = "SIG" + arg[1:]
+            if maybe_sig in SIGNAMES:
+                return getattr(signal, maybe_sig)
+    return default
 
 if __name__ == "__main__":              # pragma: no cover
     main()