# -*- coding: utf-8 -*- """ .. program:: celery multi Examples ======== .. code-block:: console $ # Single worker with explicit name and events enabled. $ celery multi start Leslie -E $ # Pidfiles and logfiles are stored in the current directory $ # by default. Use --pidfile and --logfile argument to change $ # this. The abbreviation %n will be expanded to the current $ # node name. $ celery multi start Leslie -E --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log $ # You need to add the same arguments when you restart, $ # as these are not persisted anywhere. $ celery multi restart Leslie -E --pidfile=/var/run/celery/%n.pid --logfile=/var/run/celery/%n%I.log $ # To stop the node, you need to specify the same pidfile. $ celery multi stop Leslie --pidfile=/var/run/celery/%n.pid $ # 3 workers, with 3 processes each $ celery multi start 3 -c 3 celery worker -n celery1@myhost -c 3 celery worker -n celery2@myhost -c 3 celery worker -n celery3@myhost -c 3 $ # start 3 named workers $ celery multi start image video data -c 3 celery worker -n image@myhost -c 3 celery worker -n video@myhost -c 3 celery worker -n data@myhost -c 3 $ # specify custom hostname $ celery multi start 2 --hostname=worker.example.com -c 3 celery worker -n celery1@worker.example.com -c 3 celery worker -n celery2@worker.example.com -c 3 $ # specify fully qualified nodenames $ celery multi start foo@worker.example.com bar@worker.example.com -c 3 $ # fully qualified nodenames but using the current hostname $ celery multi start foo@%h bar@%h $ # Advanced example starting 10 workers in the background: $ # * Three of the workers processes the images and video queue $ # * Two of the workers processes the data queue with loglevel DEBUG $ # * the rest processes the default' queue. $ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data -Q default -L:4,5 DEBUG $ # You can show the commands necessary to start the workers with $ # the 'show' command: $ celery multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data -Q default -L:4,5 DEBUG $ # Additional options are added to each celery worker' comamnd, $ # but you can also modify the options for ranges of, or specific workers $ # 3 workers: Two with 3 processes, and one with 10 processes. $ celery multi start 3 -c 3 -c:1 10 celery worker -n celery1@myhost -c 10 celery worker -n celery2@myhost -c 3 celery worker -n celery3@myhost -c 3 $ # can also specify options for named workers $ celery multi start image video data -c 3 -c:image 10 celery worker -n image@myhost -c 10 celery worker -n video@myhost -c 3 celery worker -n data@myhost -c 3 $ # ranges and lists of workers in options is also allowed: $ # (-c:1-3 can also be written as -c:1,2,3) $ celery multi start 5 -c 3 -c:1-3 10 celery worker -n celery1@myhost -c 10 celery worker -n celery2@myhost -c 10 celery worker -n celery3@myhost -c 10 celery worker -n celery4@myhost -c 3 celery worker -n celery5@myhost -c 3 $ # lists also works with named workers $ celery multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10 celery worker -n foo@myhost -c 10 celery worker -n bar@myhost -c 10 celery worker -n baz@myhost -c 10 celery worker -n xuzzy@myhost -c 3 """ from __future__ import absolute_import, print_function, unicode_literals import errno import os import shlex import signal import sys from collections import OrderedDict, defaultdict, namedtuple from functools import partial from subprocess import Popen from time import sleep from kombu.utils import cached_property from kombu.utils.encoding import from_utf8 from celery import VERSION_BANNER from celery.five import items from celery.platforms import Pidfile, IS_WINDOWS from celery.utils import term from celery.utils import gethostname, host_format, node_format, nodesplit from celery.utils.text import pluralize __all__ = ['MultiTool'] SIGNAMES = {sig for sig in dir(signal) if sig.startswith('SIG') and '_' not in sig} SIGMAP = {getattr(signal, name): name for name in SIGNAMES} USAGE = """\ usage: {prog_name} start [worker options] {prog_name} stop [-SIG (default: -TERM)] {prog_name} restart [-SIG] [worker options] {prog_name} kill {prog_name} show [worker options] {prog_name} get hostname [-qv] [worker options] {prog_name} names {prog_name} expand template {prog_name} help additional options (must appear after command name): * --nosplash: Don't display program info. * --quiet: Don't show as much output. * --verbose: Show more output. * --no-color: Don't display colors. """ CELERY_EXE = 'celery' multi_args_t = namedtuple( 'multi_args_t', ('name', 'argv', 'expander', 'namespace'), ) def main(): sys.exit(MultiTool().execute_from_commandline(sys.argv)) def celery_exe(*args): return ' '.join((CELERY_EXE,) + args) class MultiTool(object): retcode = 0 # Final exit code. def __init__(self, env=None, fh=None, quiet=False, verbose=False, no_color=False, nosplash=False, stdout=None, stderr=None): """fh is an old alias to stdout.""" self.stdout = self.fh = stdout or fh or sys.stdout self.stderr = stderr or sys.stderr self.env = env self.nosplash = nosplash self.quiet = quiet self.verbose = verbose self.no_color = no_color self.prog_name = 'celery multi' self.commands = {'start': self.start, 'show': self.show, 'stop': self.stop, 'stopwait': self.stopwait, 'stop_verify': self.stopwait, # compat alias 'restart': self.restart, 'kill': self.kill, 'names': self.names, 'expand': self.expand, 'get': self.get, 'help': self.help} def execute_from_commandline(self, argv, cmd='celery worker'): argv = list(argv) # don't modify callers argv. # Reserve the --nosplash|--quiet|-q/--verbose options. if '--nosplash' in argv: self.nosplash = argv.pop(argv.index('--nosplash')) if '--quiet' in argv: self.quiet = argv.pop(argv.index('--quiet')) if '-q' in argv: self.quiet = argv.pop(argv.index('-q')) if '--verbose' in argv: self.verbose = argv.pop(argv.index('--verbose')) if '--no-color' in argv: self.no_color = argv.pop(argv.index('--no-color')) self.prog_name = os.path.basename(argv.pop(0)) if not argv or argv[0][0] == '-': return self.error() try: self.commands[argv[0]](argv[1:], cmd) except KeyError: self.error('Invalid command: {0}'.format(argv[0])) return self.retcode def say(self, m, newline=True, file=None): print(m, file=file or self.stdout, end='\n' if newline else '') def carp(self, m, newline=True, file=None): return self.say(m, newline, file or self.stderr) def names(self, argv, cmd): p = NamespacedOptionParser(argv) self.say('\n'.join( n.name for n in multi_args(p, cmd)), ) def get(self, argv, cmd): wanted = argv[0] p = NamespacedOptionParser(argv[1:]) for node in multi_args(p, cmd): if node.name == wanted: self.say(' '.join(node.argv)) return def show(self, argv, cmd): p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) self.say('\n'.join( ' '.join([sys.executable] + n.argv) for n in multi_args(p, cmd)), ) def start(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) retcodes = [] self.note('> Starting nodes...') for node in multi_args(p, cmd): self.note('\t> {0}: '.format(node.name), newline=False) retcode = self.waitexec(node.argv, path=p.options['--executable']) self.note(retcode and self.FAILED or self.OK) retcodes.append(retcode) self.retcode = int(any(retcodes)) def with_detacher_default_options(self, p): _setdefaultopt(p.options, ['--pidfile', '-p'], '%n.pid') _setdefaultopt(p.options, ['--logfile', '-f'], '%n%I.log') p.options.setdefault( '--cmd', '-m {0}'.format(celery_exe('worker', '--detach')), ) _setdefaultopt(p.options, ['--executable'], sys.executable) def signal_node(self, nodename, pid, sig): try: os.kill(pid, sig) except OSError as exc: if exc.errno != errno.ESRCH: raise self.note('Could not signal {0} ({1}): No such process'.format( nodename, pid)) return False return True def node_alive(self, pid): try: os.kill(pid, 0) except OSError as exc: if exc.errno == errno.ESRCH: return False raise return True def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None, callback=None): if not nodes: return P = set(nodes) def on_down(node): P.discard(node) if callback: callback(*node) self.note(self.colored.blue('> Stopping nodes...')) for node in list(P): if node in P: nodename, _, pid = node self.note('\t> {0}: {1} -> {2}'.format( nodename, SIGMAP[sig][3:], pid)) if not self.signal_node(nodename, pid, sig): on_down(node) def note_waiting(): left = len(P) if left: pids = ', '.join(str(pid) for _, _, pid in P) self.note(self.colored.blue( '> Waiting for {0} {1} -> {2}...'.format( left, pluralize(left, 'node'), pids)), newline=False) if retry: note_waiting() its = 0 while P: for node in P: its += 1 self.note('.', newline=False) nodename, _, pid = node if not self.node_alive(pid): self.note('\n\t> {0}: {1}'.format(nodename, self.OK)) on_down(node) note_waiting() break if P and not its % len(P): sleep(float(retry)) self.note('') def getpids(self, p, cmd, callback=None): _setdefaultopt(p.options, ['--pidfile', '-p'], '%n.pid') nodes = [] for node in multi_args(p, cmd): try: pidfile_template = _getopt( p.namespaces[node.namespace], ['--pidfile', '-p'], ) except KeyError: pidfile_template = _getopt(p.options, ['--pidfile', '-p']) pid = None pidfile = node.expander(pidfile_template) try: pid = Pidfile(pidfile).read_pid() except ValueError: pass if pid: nodes.append((node.name, tuple(node.argv), pid)) else: self.note('> {0.name}: {1}'.format(node, self.DOWN)) if callback: callback(node.name, node.argv, pid) return nodes def kill(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) for nodename, _, pid in self.getpids(p, cmd): self.note('Killing node {0} ({1})'.format(nodename, pid)) self.signal_node(nodename, pid, signal.SIGKILL) def stop(self, argv, cmd, retry=None, callback=None): self.splash() p = NamespacedOptionParser(argv) return self._stop_nodes(p, cmd, retry=retry, callback=callback) def _stop_nodes(self, p, cmd, retry=None, callback=None): restargs = p.args[len(p.values):] self.shutdown_nodes(self.getpids(p, cmd, callback=callback), sig=findsig(restargs), retry=retry, callback=callback) def restart(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) retvals = [] def on_node_shutdown(nodename, argv, pid): self.note(self.colored.blue( '> Restarting node {0}: '.format(nodename)), newline=False) retval = self.waitexec(argv, path=p.options['--executable']) self.note(retval and self.FAILED or self.OK) retvals.append(retval) self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown) self.retval = int(any(retvals)) def stopwait(self, argv, cmd): self.splash() p = NamespacedOptionParser(argv) self.with_detacher_default_options(p) return self._stop_nodes(p, cmd, retry=2) stop_verify = stopwait # compat def expand(self, argv, cmd=None): template = argv[0] p = NamespacedOptionParser(argv[1:]) for node in multi_args(p, cmd): self.say(node.expander(template)) def help(self, argv, cmd=None): self.say(__doc__) def usage(self): self.splash() self.say(USAGE.format(prog_name=self.prog_name)) def splash(self): if not self.nosplash: c = self.colored self.note(c.cyan('celery multi v{0}'.format(VERSION_BANNER))) def waitexec(self, argv, path=sys.executable): args = ' '.join([path] + list(argv)) argstr = shlex.split(from_utf8(args), posix=not IS_WINDOWS) pipe = Popen(argstr, env=self.env) self.info(' {0}'.format(' '.join(argstr))) retcode = pipe.wait() if retcode < 0: self.note('* Child was terminated by signal {0}'.format(-retcode)) return -retcode elif retcode > 0: self.note('* Child terminated with errorcode {0}'.format(retcode)) return retcode def error(self, msg=None): if msg: self.carp(msg) self.usage() self.retcode = 1 return 1 def info(self, msg, newline=True): if self.verbose: self.note(msg, newline=newline) def note(self, msg, newline=True): if not self.quiet: self.say(str(msg), newline=newline) @cached_property def colored(self): return term.colored(enabled=not self.no_color) @cached_property def OK(self): return str(self.colored.green('OK')) @cached_property def FAILED(self): return str(self.colored.red('FAILED')) @cached_property def DOWN(self): return str(self.colored.magenta('DOWN')) def _args_for_node(p, name, prefix, suffix, cmd, append, options): name, nodename, expand = _get_nodename( name, prefix, suffix, options) argv = ([expand(cmd)] + [format_opt(opt, expand(value)) for opt, value in items(p.optmerge(name, options))] + [p.passthrough]) if append: argv.append(expand(append)) return multi_args_t(nodename, argv, expand, name) def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''): names = p.values options = dict(p.options) ranges = len(names) == 1 if ranges: try: names, prefix = _get_ranges(names) except ValueError: pass cmd = options.pop('--cmd', cmd) append = options.pop('--append', append) hostname = options.pop('--hostname', options.pop('-n', gethostname())) prefix = options.pop('--prefix', prefix) or '' suffix = options.pop('--suffix', suffix) or hostname suffix = '' if suffix in ('""', "''") else suffix _update_ns_opts(p, names) _update_ns_ranges(p, ranges) return (_args_for_node(p, name, prefix, suffix, cmd, append, options) for name in names) def _get_ranges(names): noderange = int(names[0]) names = [str(n) for n in range(1, noderange + 1)] prefix = 'celery' return names, prefix def _update_ns_opts(p, names): # Numbers in args always refers to the index in the list of names. # (e.g. `start foo bar baz -c:1` where 1 is foo, 2 is bar, and so on). for ns_name, ns_opts in list(items(p.namespaces)): if ns_name.isdigit(): ns_index = int(ns_name) - 1 if ns_index < 0: raise KeyError('Indexes start at 1 got: %r' % (ns_name,)) try: p.namespaces[names[ns_index]].update(ns_opts) except IndexError: raise KeyError('No node at index %r' % (ns_name,)) def _update_ns_ranges(p, ranges): for ns_name, ns_opts in list(items(p.namespaces)): if ',' in ns_name or (ranges and '-' in ns_name): for subns in parse_ns_range(ns_name, ranges): p.namespaces[subns].update(ns_opts) p.namespaces.pop(ns_name) def _get_nodename(name, prefix, suffix, options): hostname = suffix if '@' in name: nodename = options['-n'] = host_format(name) shortname, hostname = nodesplit(nodename) name = shortname else: shortname = '%s%s' % (prefix, name) nodename = options['-n'] = host_format( '{0}@{1}'.format(shortname, hostname), ) expand = partial( node_format, nodename=nodename, N=shortname, d=hostname, h=nodename, i='%i', I='%I', ) return name, nodename, expand class NamespacedOptionParser(object): def __init__(self, args): self.args = args self.options = OrderedDict() self.values = [] self.passthrough = '' self.namespaces = defaultdict(lambda: OrderedDict()) self.parse() def parse(self): rargs = list(self.args) pos = 0 while pos < len(rargs): arg = rargs[pos] if arg == '--': self.passthrough = ' '.join(rargs[pos:]) break elif arg[0] == '-': if arg[1] == '-': self.process_long_opt(arg[2:]) else: value = None if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-': value = rargs[pos + 1] pos += 1 self.process_short_opt(arg[1:], value) else: self.values.append(arg) pos += 1 def process_long_opt(self, arg, value=None): if '=' in arg: arg, value = arg.split('=', 1) self.add_option(arg, value, short=False) def process_short_opt(self, arg, value=None): self.add_option(arg, value, short=True) def optmerge(self, ns, defaults=None): if defaults is None: defaults = self.options return OrderedDict(defaults, **self.namespaces[ns]) def add_option(self, name, value, short=False, ns=None): prefix = short and '-' or '--' dest = self.options if ':' in name: name, ns = name.split(':') dest = self.namespaces[ns] dest[prefix + name] = value def quote(v): return "\\'".join("'" + p + "'" for p in v.split("'")) def format_opt(opt, value): if not value: return opt if opt.startswith('--'): return '{0}={1}'.format(opt, value) return '{0} {1}'.format(opt, value) def parse_ns_range(ns, ranges=False): ret = [] for space in ',' in ns and ns.split(',') or [ns]: if ranges and '-' in space: start, stop = space.split('-') ret.extend( str(n) for n in range(int(start), int(stop) + 1) ) else: ret.append(space) return ret def findsig(args, default=signal.SIGTERM): for arg in reversed(args): if len(arg) == 2 and arg[0] == '-': try: return int(arg[1]) except ValueError: pass if arg[0] == '-': maybe_sig = 'SIG' + arg[1:] if maybe_sig in SIGNAMES: return getattr(signal, maybe_sig) return default def _getopt(d, alt): for opt in alt: try: return d[opt] except KeyError: pass raise KeyError(alt[0]) def _setdefaultopt(d, alt, value): for opt in alt[1:]: try: return d[opt] except KeyError: pass return d.setdefault(alt[0], value) if __name__ == '__main__': # pragma: no cover main()