Ver Fonte

Implemented celeryd-multi restart. Also nice colors and stuff

Ask Solem há 14 anos atrás
pai
commit
a7f3ce6b88
1 ficheiros alterados com 184 adições e 40 exclusões
  1. 184 40
      celery/bin/celeryd_multi.py

+ 184 - 40
celery/bin/celeryd_multi.py

@@ -2,12 +2,12 @@
 
 Some examples:
 
-    # The "start" command just prints the commands needed to start the workers.
+    # The "show" command just prints the commands needed to start the workers.
     # Advanced example with 10 workers:
     #   * Three of the workers processes the images and video queue
     #   * Two of the workers processes the data queue with loglevel DEBUG
     #   * the rest processes the default' queue.
-    $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data
+    $ celeryd-multi show 10 -l INFO -Q:1-3 images,video -Q:4,5:data
         -Q default -L:4,5 DEBUG
 
     # To actually start the workers into the background you need to
@@ -72,42 +72,80 @@ import socket
 import sys
 
 from subprocess import Popen
+from time import sleep
 
+from celery import __version__
+from celery.utils import term
 from celery.utils.compat import defaultdict
 
 SIGNAMES = set(sig for sig in dir(signal)
                         if sig.startswith("SIG") and "_" not in sig)
+SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)
+
+
+USAGE = """\
+usage: %(prog_name)s detach <node1 node2 nodeN|range> [celeryd options]
+       %(prog_name)s stop <n1 n2 nN|range> [-SIG (default: -TERM)]
+       %(prog_name)s restart <n1 n2 nN|range> [-SIG] [celeryd options]
+       %(prog_name)s kill <n1 n2 nN|range>
+
+       %(prog_name)s show <n1 n2 nN|range> [celeryd options]
+       %(prog_name)s get hostname <n1 n2 nN|range> [-qv] [celeryd options]
+       %(prog_name)s names <n1 n2 nN|range>
+       %(prog_name)s expand template <n1 n2 nN|range> 
+       %(prog_name)s help
+
+additional options (must appear after command name):
+
+    * --quiet:      Don't show as much output.
+    * --verbose:    Show more output.
+    * --no-color:   Don't display colors.
+"""
 
 
 def main():
-    sys.exit(MultiTool()(sys.argv[1:]))
+    sys.exit(MultiTool().execute_from_commandline(sys.argv))
 
 
 class MultiTool(object):
     retcode = 0  # Final exit code.
 
     def __init__(self):
-        self.commands = {"start": self.start,
-                         "stop": self.stop,
+        self.commands = {"start": self.show, # XXX Deprecate
+                         "show": self.show,
                          "detach": self.detach,
+                         "stop": self.stop,
+                         "restart": self.restart,
+                         "kill": self.kill,
                          "names": self.names,
                          "expand": self.expand,
                          "get": self.get,
                          "help": self.help}
 
-    def __call__(self, argv, cmd="celeryd"):
-        if len(argv) == 0 or argv[0][0] == "-":
-            return self.error()
+    def execute_from_commandline(self, argv, cmd="celeryd"):
+        argv = list(argv)   # don't modify callers argv.
 
-        # Reserve the --quite|-q/--verbose options.
+        # Reserve the --quiet|-q/--verbose options.
         self.quiet = False
         self.verbose = False
+        self.no_color = False
         if "--quiet" in argv:
             self.quiet = argv.pop(argv.index("--quiet"))
         if "-q" in argv:
             self.quiet = argv.pop(argv.index("-q"))
         if "--verbose" in argv:
             self.verbose = argv.pop(argv.index("--verbose"))
+        if "--no-color" in argv:
+            self.no_color = argv.pop(argv.index("--no-color"))
+
+        self.colored = term.colored(enabled=not self.no_color)
+        self.OK = str(self.colored.green("OK"))
+        self.FAILED = str(self.colored.red("FAILED"))
+        self.DOWN = str(self.colored.magenta("DOWN"))
+
+        self.prog_name = os.path.basename(argv.pop(0))
+        if len(argv) == 0 or argv[0][0] == "-":
+            return self.error()
 
         try:
             self.commands[argv[0]](argv[1:], cmd)
@@ -125,50 +163,150 @@ class MultiTool(object):
         wanted = argv[0]
         p = NamespacedOptionParser(argv[1:])
         for name, worker, _ in multi_args(p, cmd):
+            print("NAME: %s WANTED: %s" % (name, wanted))
             if name == wanted:
                 print(" ".join(worker))
                 return
 
-    def start(self, argv, cmd):
+    def show(self, argv, cmd):
         p = NamespacedOptionParser(argv)
+        self.note("> Starting nodes...")
         print("\n".join(" ".join(worker)
                         for _, worker, _ in multi_args(p, cmd)))
 
     def detach(self, argv, cmd):
+        self.splash()
         p = NamespacedOptionParser(argv)
+        self.with_detacher_default_options(p)
+        retcodes = []
+        self.note("> Starting nodes...")
+        for nodename, argv, _ in multi_args(p, cmd):
+            self.note("\t> %s: " % (nodename, ), newline=False)
+            retcode = self.waitexec(argv)
+            self.note(retcode and self.FAILED or self.OK)
+            retcodes.append(retcode)
+        self.retcode = int(any(retcodes))
+
+    def with_detacher_default_options(self, p):
         p.options.setdefault("--pidfile", "celeryd@%n.pid")
         p.options.setdefault("--logfile", "celeryd@%n.log")
         p.options.setdefault("--cmd", "-m celery.bin.celeryd_detach")
-        retcodes = []
-        for nodename, argv, _ in multi_args(p, cmd):
-            self.note("> Starting node %s..." % (nodename, ))
-            retcodes.append(self.waitexec(argv))
-        self.retcode = not any(retcodes)
 
-    def stop(self, argv, cmd):
+    def signal_node(self, nodename, pid, sig):
+        try:
+            os.kill(pid, sig)
+        except OSError, exc:
+            if os.errno != errno.ESRCH:
+                raise
+            self.note("Could not signal %s (%s): No such process" % (
+                        nodename, pid))
+            return False
+        return True
+
+    def node_alive(self, pid):
+        try:
+            os.kill(pid, 0)
+        except OSError, exc:
+            if exc.errno == errno.ESRCH:
+                return False
+            raise
+        return True
+
+    def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
+            callback=None):
+        if not nodes:
+            return
+        P = set(nodes)
+
+        def on_down(node):
+            P.discard(node)
+            if callback:
+                callback(*node)
+
+        self.note(self.colored.blue("> Stopping nodes..."))
+        for node in P:
+            nodename, _, pid = node
+            self.note("\t> %s: %s -> %s" % (nodename, SIGMAP[sig][3:], pid))
+            if not self.signal_node(nodename, pid, sig):
+                on_down(node)
+
+        def note_waiting():
+            left = len(P)
+            if left:
+                self.note(self.colored.blue("> Waiting for %s %s..." % (
+                    left, left > 1 and "nodes" or "node")), newline=False)
+
+        if retry:
+            note_waiting()
+            its = 0
+            while P:
+                for node in P:
+                    its += 1
+                    self.note(".", newline=False)
+                    nodename, _, pid = node
+                    if not self.node_alive(pid):
+                        self.note("\n\t> %s: %s" % (nodename, self.OK))
+                        on_down(node)
+                        note_waiting()
+                        break
+                if P and not its % len(P):
+                    sleep(float(retry))
+            self.note("")
+
+    def getpids(self, p, cmd, callback=None):
         from celery import platforms
-
-        p = NamespacedOptionParser(argv)
-        restargs = p.args[len(p.values):]
-        sig = findsig(restargs)
         pidfile_template = p.options.setdefault("--pidfile", "celeryd@%n.pid")
 
-        for nodename, _, expander in multi_args(p, cmd):
+        nodes = []
+        for nodename, argv, expander in multi_args(p, cmd):
             pidfile = expander(pidfile_template)
             try:
                 pid = platforms.read_pid_from_pidfile(pidfile)
             except ValueError:
                 pass
             if pid:
-                self.note("> Stopping node %s (%s)..." % (nodename, pid))
-                try:
-                    os.kill(pid, sig)
-                except OSError, exc:
-                    if exc.errno != errno.ESRCH:
-                        raise
-                    self.note("Could not stop pid %s: No such process." % pid)
+                nodes.append((nodename, tuple(argv), pid))
             else:
-                self.note("> Skipping node %s: not alive." % (nodename, ))
+                self.note("> %s: %s" % (nodename, self.DOWN))
+                if callback:
+                    callback(nodename, argv, pid)
+
+        return nodes
+
+    def kill(self, argv, cmd):
+        self.splash()
+        p = NamespacedOptionParser(argv)
+        for nodename, _, pid in self.getpids(p, cmd):
+            self.note("Killing node %s (%s)" % (nodename, pid))
+            self.signal_node(nodename, pid, signal.SIGKILL)
+
+    def stop(self, argv, cmd):
+        self.splash()
+        p = NamespacedOptionParser(argv)
+        return self._stop_nodes(p, cmd)
+
+    def _stop_nodes(self, p, cmd, retry=None, callback=None):
+        restargs = p.args[len(p.values):]
+        self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
+                            sig=findsig(restargs),
+                            retry=retry,
+                            callback=callback)
+
+    def restart(self, argv, cmd):
+        self.splash()
+        p = NamespacedOptionParser(argv)
+        self.with_detacher_default_options(p)
+        retvals = []
+
+        def on_node_shutdown(nodename, argv, pid):
+            self.note(self.colored.blue(
+                "> Restarting node %s: " % nodename), newline=False)
+            retval = self.waitexec(argv)
+            self.note(retval and self.FAILED or self.OK)
+            retvals.append(retval)
+
+        self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
+        self.retval = int(any(retvals))
 
     def expand(self, argv, cmd=None):
         template = argv[0]
@@ -180,18 +318,24 @@ class MultiTool(object):
         say(__doc__)
 
     def usage(self):
-        say("Please use one of the following commands: %s" % (
-            ", ".join(self.commands.keys())))
+        self.splash()
+        say(USAGE % {"prog_name": self.prog_name})
+
+    def splash(self):
+        c = self.colored
+        self.note(c.cyan("%s v%s" % (self.prog_name, __version__)))
 
     def waitexec(self, argv, path=sys.executable):
-        pipe = Popen(shlex.split(" ".join([path] + argv)))
-        self.info("  %s" % " ".join([path] + argv))
+        argstr = shlex.split(" ".join([path] + list(argv)))
+        pipe = Popen(argstr)
+        self.info("  %s" % " ".join(argstr))
         retcode = pipe.wait()
         if retcode < 0:
             self.note("* Child was terminated by signal %s" % (-retcode, ))
+            return -retcode
         elif retcode > 0:
             self.note("* Child terminated with failure code %s" % (retcode, ))
-        return retcode == 0
+        return retcode
 
     def error(self, msg=None):
         if msg:
@@ -200,13 +344,13 @@ class MultiTool(object):
         self.retcode = 1
         return 1
 
-    def info(self, msg):
+    def info(self, msg, newline=True):
         if self.verbose:
-            self.note(msg)
+            self.note(msg, newline=newline)
 
-    def note(self, msg):
+    def note(self, msg, newline=True):
         if not self.quiet:
-            say(msg)
+            say(str(msg), newline=newline)
 
 
 def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
@@ -332,8 +476,8 @@ def abbreviations(map):
     return expand
 
 
-def say(m):
-    sys.stderr.write("%s\n" % (m, ))
+def say(m, newline=True):
+    sys.stderr.write(newline and "%s\n" % (m, ) or m)
 
 
 def findsig(args, default=signal.SIGTERM):