multi.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652
  1. # -*- coding: utf-8 -*-
  2. """
  3. .. program:: celery multi
  4. Examples
  5. ========
  6. .. code-block:: console
  7. $ # Single worker with explicit name and events enabled.
  8. $ celery multi start Leslie -E
  9. $ # Pidfiles and logfiles are stored in the current directory
  10. $ # by default. Use --pidfile and --logfile argument to change
  11. $ # this. The abbreviation %n will be expanded to the current
  12. $ # node name.
  13. $ celery multi start Leslie -E --pidfile=/var/run/celery/%n.pid
  14. --logfile=/var/log/celery/%n%I.log
  15. $ # You need to add the same arguments when you restart,
  16. $ # as these are not persisted anywhere.
  17. $ celery multi restart Leslie -E --pidfile=/var/run/celery/%n.pid
  18. --logfile=/var/run/celery/%n%I.log
  19. $ # To stop the node, you need to specify the same pidfile.
  20. $ celery multi stop Leslie --pidfile=/var/run/celery/%n.pid
  21. $ # 3 workers, with 3 processes each
  22. $ celery multi start 3 -c 3
  23. celery worker -n celery1@myhost -c 3
  24. celery worker -n celery2@myhost -c 3
  25. celery worker -n celery3@myhost -c 3
  26. $ # start 3 named workers
  27. $ celery multi start image video data -c 3
  28. celery worker -n image@myhost -c 3
  29. celery worker -n video@myhost -c 3
  30. celery worker -n data@myhost -c 3
  31. $ # specify custom hostname
  32. $ celery multi start 2 --hostname=worker.example.com -c 3
  33. celery worker -n celery1@worker.example.com -c 3
  34. celery worker -n celery2@worker.example.com -c 3
  35. $ # specify fully qualified nodenames
  36. $ celery multi start foo@worker.example.com bar@worker.example.com -c 3
  37. $ # fully qualified nodenames but using the current hostname
  38. $ celery multi start foo@%h bar@%h
  39. $ # Advanced example starting 10 workers in the background:
  40. $ # * Three of the workers processes the images and video queue
  41. $ # * Two of the workers processes the data queue with loglevel DEBUG
  42. $ # * the rest processes the default' queue.
  43. $ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  44. -Q default -L:4,5 DEBUG
  45. $ # You can show the commands necessary to start the workers with
  46. $ # the 'show' command:
  47. $ celery multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  48. -Q default -L:4,5 DEBUG
  49. $ # Additional options are added to each celery worker' comamnd,
  50. $ # but you can also modify the options for ranges of, or specific workers
  51. $ # 3 workers: Two with 3 processes, and one with 10 processes.
  52. $ celery multi start 3 -c 3 -c:1 10
  53. celery worker -n celery1@myhost -c 10
  54. celery worker -n celery2@myhost -c 3
  55. celery worker -n celery3@myhost -c 3
  56. $ # can also specify options for named workers
  57. $ celery multi start image video data -c 3 -c:image 10
  58. celery worker -n image@myhost -c 10
  59. celery worker -n video@myhost -c 3
  60. celery worker -n data@myhost -c 3
  61. $ # ranges and lists of workers in options is also allowed:
  62. $ # (-c:1-3 can also be written as -c:1,2,3)
  63. $ celery multi start 5 -c 3 -c:1-3 10
  64. celery worker -n celery1@myhost -c 10
  65. celery worker -n celery2@myhost -c 10
  66. celery worker -n celery3@myhost -c 10
  67. celery worker -n celery4@myhost -c 3
  68. celery worker -n celery5@myhost -c 3
  69. $ # lists also works with named workers
  70. $ celery multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  71. celery worker -n foo@myhost -c 10
  72. celery worker -n bar@myhost -c 10
  73. celery worker -n baz@myhost -c 10
  74. celery worker -n xuzzy@myhost -c 3
  75. """
  76. from __future__ import absolute_import, print_function, unicode_literals
  77. import errno
  78. import os
  79. import shlex
  80. import signal
  81. import sys
  82. from collections import OrderedDict, defaultdict, namedtuple
  83. from functools import partial
  84. from subprocess import Popen
  85. from time import sleep
  86. from kombu.utils import cached_property
  87. from kombu.utils.encoding import from_utf8
  88. from celery import VERSION_BANNER
  89. from celery.five import items
  90. from celery.platforms import Pidfile, IS_WINDOWS
  91. from celery.utils import term
  92. from celery.utils import gethostname, host_format, node_format, nodesplit
  93. from celery.utils.text import pluralize
  94. __all__ = ['MultiTool']
  95. SIGNAMES = {sig for sig in dir(signal)
  96. if sig.startswith('SIG') and '_' not in sig}
  97. SIGMAP = {getattr(signal, name): name for name in SIGNAMES}
  98. USAGE = """\
  99. usage: {prog_name} start <node1 node2 nodeN|range> [worker options]
  100. {prog_name} stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  101. {prog_name} restart <n1 n2 nN|range> [-SIG] [worker options]
  102. {prog_name} kill <n1 n2 nN|range>
  103. {prog_name} show <n1 n2 nN|range> [worker options]
  104. {prog_name} get hostname <n1 n2 nN|range> [-qv] [worker options]
  105. {prog_name} names <n1 n2 nN|range>
  106. {prog_name} expand template <n1 n2 nN|range>
  107. {prog_name} help
  108. additional options (must appear after command name):
  109. * --nosplash: Don't display program info.
  110. * --quiet: Don't show as much output.
  111. * --verbose: Show more output.
  112. * --no-color: Don't display colors.
  113. """
  114. CELERY_EXE = 'celery'
  115. multi_args_t = namedtuple(
  116. 'multi_args_t', ('name', 'argv', 'expander', 'namespace'),
  117. )
  118. def main():
  119. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  120. def celery_exe(*args):
  121. return ' '.join((CELERY_EXE,) + args)
  122. class MultiTool(object):
  123. retcode = 0 # Final exit code.
  124. def __init__(self, env=None, fh=None, quiet=False, verbose=False,
  125. no_color=False, nosplash=False, stdout=None, stderr=None):
  126. """fh is an old alias to stdout."""
  127. self.stdout = self.fh = stdout or fh or sys.stdout
  128. self.stderr = stderr or sys.stderr
  129. self.env = env
  130. self.nosplash = nosplash
  131. self.quiet = quiet
  132. self.verbose = verbose
  133. self.no_color = no_color
  134. self.prog_name = 'celery multi'
  135. self.commands = {'start': self.start,
  136. 'show': self.show,
  137. 'stop': self.stop,
  138. 'stopwait': self.stopwait,
  139. 'stop_verify': self.stopwait, # compat alias
  140. 'restart': self.restart,
  141. 'kill': self.kill,
  142. 'names': self.names,
  143. 'expand': self.expand,
  144. 'get': self.get,
  145. 'help': self.help}
  146. def execute_from_commandline(self, argv, cmd='celery worker'):
  147. argv = list(argv) # don't modify callers argv.
  148. # Reserve the --nosplash|--quiet|-q/--verbose options.
  149. if '--nosplash' in argv:
  150. self.nosplash = argv.pop(argv.index('--nosplash'))
  151. if '--quiet' in argv:
  152. self.quiet = argv.pop(argv.index('--quiet'))
  153. if '-q' in argv:
  154. self.quiet = argv.pop(argv.index('-q'))
  155. if '--verbose' in argv:
  156. self.verbose = argv.pop(argv.index('--verbose'))
  157. if '--no-color' in argv:
  158. self.no_color = argv.pop(argv.index('--no-color'))
  159. self.prog_name = os.path.basename(argv.pop(0))
  160. if not argv or argv[0][0] == '-':
  161. return self.error()
  162. try:
  163. self.commands[argv[0]](argv[1:], cmd)
  164. except KeyError:
  165. self.error('Invalid command: {0}'.format(argv[0]))
  166. return self.retcode
  167. def say(self, m, newline=True, file=None):
  168. print(m, file=file or self.stdout, end='\n' if newline else '')
  169. def carp(self, m, newline=True, file=None):
  170. return self.say(m, newline, file or self.stderr)
  171. def names(self, argv, cmd):
  172. p = NamespacedOptionParser(argv)
  173. self.say('\n'.join(
  174. n.name for n in multi_args(p, cmd)),
  175. )
  176. def get(self, argv, cmd):
  177. wanted = argv[0]
  178. p = NamespacedOptionParser(argv[1:])
  179. for node in multi_args(p, cmd):
  180. if node.name == wanted:
  181. self.say(' '.join(node.argv))
  182. return
  183. def show(self, argv, cmd):
  184. p = NamespacedOptionParser(argv)
  185. self.with_detacher_default_options(p)
  186. self.say('\n'.join(
  187. ' '.join([sys.executable] + n.argv) for n in multi_args(p, cmd)),
  188. )
  189. def start(self, argv, cmd):
  190. self.splash()
  191. p = NamespacedOptionParser(argv)
  192. self.with_detacher_default_options(p)
  193. retcodes = []
  194. self.note('> Starting nodes...')
  195. for node in multi_args(p, cmd):
  196. self.note('\t> {0}: '.format(node.name), newline=False)
  197. retcode = self.waitexec(node.argv, path=p.options['--executable'])
  198. self.note(retcode and self.FAILED or self.OK)
  199. retcodes.append(retcode)
  200. self.retcode = int(any(retcodes))
  201. def with_detacher_default_options(self, p):
  202. _setdefaultopt(p.options, ['--pidfile', '-p'], '%n.pid')
  203. _setdefaultopt(p.options, ['--logfile', '-f'], '%n%I.log')
  204. p.options.setdefault(
  205. '--cmd',
  206. '-m {0}'.format(celery_exe('worker', '--detach')),
  207. )
  208. _setdefaultopt(p.options, ['--executable'], sys.executable)
  209. def signal_node(self, nodename, pid, sig):
  210. try:
  211. os.kill(pid, sig)
  212. except OSError as exc:
  213. if exc.errno != errno.ESRCH:
  214. raise
  215. self.note('Could not signal {0} ({1}): No such process'.format(
  216. nodename, pid))
  217. return False
  218. return True
  219. def node_alive(self, pid):
  220. try:
  221. os.kill(pid, 0)
  222. except OSError as exc:
  223. if exc.errno == errno.ESRCH:
  224. return False
  225. raise
  226. return True
  227. def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
  228. callback=None):
  229. if not nodes:
  230. return
  231. P = set(nodes)
  232. def on_down(node):
  233. P.discard(node)
  234. if callback:
  235. callback(*node)
  236. self.note(self.colored.blue('> Stopping nodes...'))
  237. for node in list(P):
  238. if node in P:
  239. nodename, _, pid = node
  240. self.note('\t> {0}: {1} -> {2}'.format(
  241. nodename, SIGMAP[sig][3:], pid))
  242. if not self.signal_node(nodename, pid, sig):
  243. on_down(node)
  244. def note_waiting():
  245. left = len(P)
  246. if left:
  247. pids = ', '.join(str(pid) for _, _, pid in P)
  248. self.note(self.colored.blue(
  249. '> Waiting for {0} {1} -> {2}...'.format(
  250. left, pluralize(left, 'node'), pids)), newline=False)
  251. if retry:
  252. note_waiting()
  253. its = 0
  254. while P:
  255. for node in P:
  256. its += 1
  257. self.note('.', newline=False)
  258. nodename, _, pid = node
  259. if not self.node_alive(pid):
  260. self.note('\n\t> {0}: {1}'.format(nodename, self.OK))
  261. on_down(node)
  262. note_waiting()
  263. break
  264. if P and not its % len(P):
  265. sleep(float(retry))
  266. self.note('')
  267. def getpids(self, p, cmd, callback=None):
  268. _setdefaultopt(p.options, ['--pidfile', '-p'], '%n.pid')
  269. nodes = []
  270. for node in multi_args(p, cmd):
  271. try:
  272. pidfile_template = _getopt(
  273. p.namespaces[node.namespace], ['--pidfile', '-p'],
  274. )
  275. except KeyError:
  276. pidfile_template = _getopt(p.options, ['--pidfile', '-p'])
  277. pid = None
  278. pidfile = node.expander(pidfile_template)
  279. try:
  280. pid = Pidfile(pidfile).read_pid()
  281. except ValueError:
  282. pass
  283. if pid:
  284. nodes.append((node.name, tuple(node.argv), pid))
  285. else:
  286. self.note('> {0.name}: {1}'.format(node, self.DOWN))
  287. if callback:
  288. callback(node.name, node.argv, pid)
  289. return nodes
  290. def kill(self, argv, cmd):
  291. self.splash()
  292. p = NamespacedOptionParser(argv)
  293. for nodename, _, pid in self.getpids(p, cmd):
  294. self.note('Killing node {0} ({1})'.format(nodename, pid))
  295. self.signal_node(nodename, pid, signal.SIGKILL)
  296. def stop(self, argv, cmd, retry=None, callback=None):
  297. self.splash()
  298. p = NamespacedOptionParser(argv)
  299. return self._stop_nodes(p, cmd, retry=retry, callback=callback)
  300. def _stop_nodes(self, p, cmd, retry=None, callback=None):
  301. restargs = p.args[len(p.values):]
  302. self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
  303. sig=findsig(restargs),
  304. retry=retry,
  305. callback=callback)
  306. def restart(self, argv, cmd):
  307. self.splash()
  308. p = NamespacedOptionParser(argv)
  309. self.with_detacher_default_options(p)
  310. retvals = []
  311. def on_node_shutdown(nodename, argv, pid):
  312. self.note(self.colored.blue(
  313. '> Restarting node {0}: '.format(nodename)), newline=False)
  314. retval = self.waitexec(argv, path=p.options['--executable'])
  315. self.note(retval and self.FAILED or self.OK)
  316. retvals.append(retval)
  317. self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
  318. self.retval = int(any(retvals))
  319. def stopwait(self, argv, cmd):
  320. self.splash()
  321. p = NamespacedOptionParser(argv)
  322. self.with_detacher_default_options(p)
  323. return self._stop_nodes(p, cmd, retry=2)
  324. stop_verify = stopwait # compat
  325. def expand(self, argv, cmd=None):
  326. template = argv[0]
  327. p = NamespacedOptionParser(argv[1:])
  328. for node in multi_args(p, cmd):
  329. self.say(node.expander(template))
  330. def help(self, argv, cmd=None):
  331. self.say(__doc__)
  332. def usage(self):
  333. self.splash()
  334. self.say(USAGE.format(prog_name=self.prog_name))
  335. def splash(self):
  336. if not self.nosplash:
  337. c = self.colored
  338. self.note(c.cyan('celery multi v{0}'.format(VERSION_BANNER)))
  339. def waitexec(self, argv, path=sys.executable):
  340. args = ' '.join([path] + list(argv))
  341. argstr = shlex.split(from_utf8(args), posix=not IS_WINDOWS)
  342. pipe = Popen(argstr, env=self.env)
  343. self.info(' {0}'.format(' '.join(argstr)))
  344. retcode = pipe.wait()
  345. if retcode < 0:
  346. self.note('* Child was terminated by signal {0}'.format(-retcode))
  347. return -retcode
  348. elif retcode > 0:
  349. self.note('* Child terminated with errorcode {0}'.format(retcode))
  350. return retcode
  351. def error(self, msg=None):
  352. if msg:
  353. self.carp(msg)
  354. self.usage()
  355. self.retcode = 1
  356. return 1
  357. def info(self, msg, newline=True):
  358. if self.verbose:
  359. self.note(msg, newline=newline)
  360. def note(self, msg, newline=True):
  361. if not self.quiet:
  362. self.say(str(msg), newline=newline)
  363. @cached_property
  364. def colored(self):
  365. return term.colored(enabled=not self.no_color)
  366. @cached_property
  367. def OK(self):
  368. return str(self.colored.green('OK'))
  369. @cached_property
  370. def FAILED(self):
  371. return str(self.colored.red('FAILED'))
  372. @cached_property
  373. def DOWN(self):
  374. return str(self.colored.magenta('DOWN'))
  375. def _args_for_node(p, name, prefix, suffix, cmd, append, options):
  376. name, nodename, expand = _get_nodename(
  377. name, prefix, suffix, options)
  378. argv = ([expand(cmd)] +
  379. [format_opt(opt, expand(value))
  380. for opt, value in items(p.optmerge(name, options))] +
  381. [p.passthrough])
  382. if append:
  383. argv.append(expand(append))
  384. return multi_args_t(nodename, argv, expand, name)
  385. def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''):
  386. names = p.values
  387. options = dict(p.options)
  388. ranges = len(names) == 1
  389. if ranges:
  390. try:
  391. names, prefix = _get_ranges(names)
  392. except ValueError:
  393. pass
  394. cmd = options.pop('--cmd', cmd)
  395. append = options.pop('--append', append)
  396. hostname = options.pop('--hostname',
  397. options.pop('-n', gethostname()))
  398. prefix = options.pop('--prefix', prefix) or ''
  399. suffix = options.pop('--suffix', suffix) or hostname
  400. suffix = '' if suffix in ('""', "''") else suffix
  401. _update_ns_opts(p, names)
  402. _update_ns_ranges(p, ranges)
  403. return (_args_for_node(p, name, prefix, suffix, cmd, append, options)
  404. for name in names)
  405. def _get_ranges(names):
  406. noderange = int(names[0])
  407. names = [str(n) for n in range(1, noderange + 1)]
  408. prefix = 'celery'
  409. return names, prefix
  410. def _update_ns_opts(p, names):
  411. # Numbers in args always refers to the index in the list of names.
  412. # (e.g. `start foo bar baz -c:1` where 1 is foo, 2 is bar, and so on).
  413. for ns_name, ns_opts in list(items(p.namespaces)):
  414. if ns_name.isdigit():
  415. ns_index = int(ns_name) - 1
  416. if ns_index < 0:
  417. raise KeyError('Indexes start at 1 got: %r' % (ns_name,))
  418. try:
  419. p.namespaces[names[ns_index]].update(ns_opts)
  420. except IndexError:
  421. raise KeyError('No node at index %r' % (ns_name,))
  422. def _update_ns_ranges(p, ranges):
  423. for ns_name, ns_opts in list(items(p.namespaces)):
  424. if ',' in ns_name or (ranges and '-' in ns_name):
  425. for subns in parse_ns_range(ns_name, ranges):
  426. p.namespaces[subns].update(ns_opts)
  427. p.namespaces.pop(ns_name)
  428. def _get_nodename(name, prefix, suffix, options):
  429. hostname = suffix
  430. if '@' in name:
  431. nodename = options['-n'] = host_format(name)
  432. shortname, hostname = nodesplit(nodename)
  433. name = shortname
  434. else:
  435. shortname = '%s%s' % (prefix, name)
  436. nodename = options['-n'] = host_format(
  437. '{0}@{1}'.format(shortname, hostname),
  438. )
  439. expand = partial(
  440. node_format, nodename=nodename, N=shortname, d=hostname,
  441. h=nodename, i='%i', I='%I',
  442. )
  443. return name, nodename, expand
  444. class NamespacedOptionParser(object):
  445. def __init__(self, args):
  446. self.args = args
  447. self.options = OrderedDict()
  448. self.values = []
  449. self.passthrough = ''
  450. self.namespaces = defaultdict(lambda: OrderedDict())
  451. self.parse()
  452. def parse(self):
  453. rargs = list(self.args)
  454. pos = 0
  455. while pos < len(rargs):
  456. arg = rargs[pos]
  457. if arg == '--':
  458. self.passthrough = ' '.join(rargs[pos:])
  459. break
  460. elif arg[0] == '-':
  461. if arg[1] == '-':
  462. self.process_long_opt(arg[2:])
  463. else:
  464. value = None
  465. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  466. value = rargs[pos + 1]
  467. pos += 1
  468. self.process_short_opt(arg[1:], value)
  469. else:
  470. self.values.append(arg)
  471. pos += 1
  472. def process_long_opt(self, arg, value=None):
  473. if '=' in arg:
  474. arg, value = arg.split('=', 1)
  475. self.add_option(arg, value, short=False)
  476. def process_short_opt(self, arg, value=None):
  477. self.add_option(arg, value, short=True)
  478. def optmerge(self, ns, defaults=None):
  479. if defaults is None:
  480. defaults = self.options
  481. return OrderedDict(defaults, **self.namespaces[ns])
  482. def add_option(self, name, value, short=False, ns=None):
  483. prefix = short and '-' or '--'
  484. dest = self.options
  485. if ':' in name:
  486. name, ns = name.split(':')
  487. dest = self.namespaces[ns]
  488. dest[prefix + name] = value
  489. def quote(v):
  490. return "\\'".join("'" + p + "'" for p in v.split("'"))
  491. def format_opt(opt, value):
  492. if not value:
  493. return opt
  494. if opt.startswith('--'):
  495. return '{0}={1}'.format(opt, value)
  496. return '{0} {1}'.format(opt, value)
  497. def parse_ns_range(ns, ranges=False):
  498. ret = []
  499. for space in ',' in ns and ns.split(',') or [ns]:
  500. if ranges and '-' in space:
  501. start, stop = space.split('-')
  502. ret.extend(
  503. str(n) for n in range(int(start), int(stop) + 1)
  504. )
  505. else:
  506. ret.append(space)
  507. return ret
  508. def findsig(args, default=signal.SIGTERM):
  509. for arg in reversed(args):
  510. if len(arg) == 2 and arg[0] == '-':
  511. try:
  512. return int(arg[1])
  513. except ValueError:
  514. pass
  515. if arg[0] == '-':
  516. maybe_sig = 'SIG' + arg[1:]
  517. if maybe_sig in SIGNAMES:
  518. return getattr(signal, maybe_sig)
  519. return default
  520. def _getopt(d, alt):
  521. for opt in alt:
  522. try:
  523. return d[opt]
  524. except KeyError:
  525. pass
  526. raise KeyError(alt[0])
  527. def _setdefaultopt(d, alt, value):
  528. for opt in alt[1:]:
  529. try:
  530. return d[opt]
  531. except KeyError:
  532. pass
  533. return d.setdefault(alt[0], value)
  534. if __name__ == '__main__': # pragma: no cover
  535. main()