Browse Source

celeryd-multi: Tool for shell scripts to start multiple workers.

Hope I never have to modify this code... ;)

Some examples::

    # Advanced example with 10 workers:
    #   * Three of the workers processes the images and video queue
    #   * Two of the workers processes the data queue with loglevel DEBUG
    #   * the rest processes the default' queue.
    $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data
        -Q default -L:4,5 DEBUG

    # get commands to start 10 workers, with 3 processes each
    $ celeryd-multi start 3 -c 3
    celeryd -n celeryd1.myhost -c 3
    celeryd -n celeryd2.myhost -c 3
    celeryd- n celeryd3.myhost -c 3

    # start 3 named workers
    $ celeryd-multi start image video data -c 3
    celeryd -n image.myhost -c 3
    celeryd -n video.myhost -c 3
    celeryd -n data.myhost -c 3

    # specify custom hostname
    $ celeryd-multi start 2 -n worker.example.com -c 3
    celeryd -n celeryd1.worker.example.com -c 3
    celeryd -n celeryd2.worker.example.com -c 3

    # Additionl options are added to each celeryd',
    # but you can also modify the options for ranges of or single workers

    # 3 workers: Two with 3 processes, and one with 10 processes.
    $ celeryd-multi start 3 -c 3 -c:1 10
    celeryd -n celeryd1.myhost -c 10
    celeryd -n celeryd2.myhost -c 3
    celeryd -n celeryd3.myhost -c 3

    # can also specify options for named workers
    $ celeryd-multi start image video data -c 3 -c:image 10
    celeryd -n image.myhost -c 10
    celeryd -n video.myhost -c 3
    celeryd -n data.myhost -c 3

    # ranges and lists of workers in options is also allowed:
    # (-c:1-3 can also be written as -c:1,2,3)
    $ celeryd-multi start 5 -c 3  -c:1-3 10
    celeryd-multi -n celeryd1.myhost -c 10
    celeryd-multi -n celeryd2.myhost -c 10
    celeryd-multi -n celeryd3.myhost -c 10
    celeryd-multi -n celeryd4.myhost -c 3
    celeryd-multi -n celeryd5.myhost -c 3

    # lists also works with named workers
    $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
    celeryd-multi -n foo.myhost -c 10
    celeryd-multi -n bar.myhost -c 10
    celeryd-multi -n baz.myhost -c 10
    celeryd-multi -n xuzzy.myhost -c 3
Ask Solem 15 years ago
parent
commit
d786c30283
3 changed files with 153 additions and 1 deletions
  1. 5 0
      bin/celeryd-multi
  2. 145 0
      celery/bin/celeryd_multi.py
  3. 3 1
      setup.py

+ 5 - 0
bin/celeryd-multi

@@ -0,0 +1,5 @@
+#!/usr/bin/env python
+from celery.bin.celeryd_multi import main
+
+if __name__ == "__main__":
+    main()

+ 145 - 0
celery/bin/celeryd_multi.py

@@ -0,0 +1,145 @@
+import sys
+import shlex
+import socket
+
+from celery.utils.compat import defaultdict
+from carrot.utils import rpartition
+
+
+class OptionParser(object):
+
+    def __init__(self, args):
+        self.args = args
+        self.options = {}
+        self.values = []
+        self.parse()
+
+    def parse(self):
+        rargs = list(self.args)
+        pos = 0
+        while pos < len(rargs):
+            arg = rargs[pos]
+            if arg[0] == "-":
+                if arg[1] == "-":
+                    self.process_long_opt(arg[2:])
+                else:
+                    value = None
+                    if rargs[pos + 1][0] != '-':
+                        value = rargs[pos + 1]
+                        pos += 1
+                    self.process_short_opt(arg[1:], value)
+            else:
+                self.values.append(arg)
+            pos += 1
+
+    def process_long_opt(self, arg, value=None):
+        if "=" in arg:
+            arg, value = arg.split("=", 1)
+        self.add_option(arg, value, short=False)
+
+    def process_short_opt(self, arg, value=None):
+        self.add_option(arg, value, short=True)
+
+    def set_option(self, arg, value, short=False):
+        prefix = short and "-" or "--"
+        self.options[prefix + arg] = value
+
+
+class NamespacedOptionParser(OptionParser):
+
+    def __init__(self, args):
+        self.namespaces = defaultdict(lambda: {})
+        super(NamespacedOptionParser, self).__init__(args)
+
+    def add_option(self, name, value, short=False, ns=None):
+        prefix = short and "-" or "--"
+        dest = self.options
+        if ":" in name:
+            name, ns = name.split(":")
+            dest = self.namespaces[ns]
+        dest[prefix + name] = value
+
+    def optmerge(self, ns, defaults=None):
+        if defaults is None:
+            defaults = self.options
+        return dict(defaults, **self.namespaces[ns])
+
+
+def quote(v):
+    return "\\'".join("'" + p + "'" for p in v.split("'"))
+
+
+def format_opt(opt, value):
+    if not value:
+        return opt
+    if opt[0:2] == "--":
+        return "%s=%s" % (opt, quote(value))
+    return "%s %s" % (opt, quote(value))
+
+
+def parse_ns_range(ns, ranges=False):
+    ret = []
+    for space in "," in ns and ns.split(",") or [ns]:
+        if ranges and "-" in space:
+            start, stop = space.split("-")
+            x = map(str, range(int(start), int(stop) + 1))
+            ret.extend(x)
+        else:
+            ret.append(space)
+    return ret
+
+
+def multi_args(p, cmd="celeryd", prefix="", suffix=""):
+    names = p.values
+    options = dict(p.options)
+    ranges = len(names) == 1
+    if ranges:
+        names = map(str, range(1, int(names[0]) + 1))
+        prefix = "celery"
+    cmd = options.pop("--cmd", cmd)
+    hostname = options.pop("--hostname",
+                   options.pop("-n", socket.gethostname()))
+    prefix = options.pop("--prefix", prefix) or ""
+    suffix = options.pop("--suffix", suffix) or "." + hostname
+
+    for ns_name, ns_opts in p.namespaces.items():
+        if "," in ns_name or (ranges and "-" in ns_name):
+            for subns in parse_ns_range(ns_name, ranges):
+                p.namespaces[subns].update(ns_opts)
+        p.namespaces.pop(ns_name)
+
+    cels = []
+    for name in names:
+        this_name = options["-n"] = prefix + name + suffix
+        this_cmd = cmd.replace("%n", this_name)
+        line = this_cmd + " " + " ".join(format_opt(opt, value)
+                for opt, value in p.optmerge(name, options).items())
+        cels.append((this_name, line))
+
+    return cels
+
+
+def names(argv, cmd):
+    p = NamespacedOptionParser(argv)
+    print("\n".join(hostname
+                        for hostname, _ in multi_args(p, cmd)))
+
+def start(argv, cmd):
+    p = NamespacedOptionParser(argv)
+    print("\n".join(worker
+                        for _, worker in multi_args(p, cmd)))
+
+
+COMMANDS = {"start": start, "names": names}
+
+
+def celeryd_multi(argv, cmd="celeryd"):
+    return COMMANDS[argv[0]](argv[1:], cmd)
+
+
+def main():
+    celeryd_multi(sys.argv[1:])
+
+
+if __name__ == "__main__":
+    main()

+ 3 - 1
setup.py

@@ -69,7 +69,8 @@ setup(
     platforms=["any"],
     license="BSD",
     packages=find_packages(exclude=['ez_setup', 'tests', 'tests.*']),
-    scripts=["bin/celeryd", "bin/celerybeat", "bin/camqadm"],
+    scripts=["bin/celeryd", "bin/celerybeat",
+             "bin/camqadm", "bin/celeryd-multi"],
     zip_safe=False,
     setup_requires=["nose", "nose-cover3", "unittest2>=0.4.0", "simplejson"],
     install_requires=install_requires,
@@ -93,6 +94,7 @@ setup(
             'celeryinit = celery.bin.celeryinit:main',
             'celerybeat = celery.bin.celerybeat:main',
             'camqadm = celery.bin.camqadm:main',
+            'celeryd-multi = celery.bin.celeryd_multi:main',
             ],
     },
     long_description=long_description,