multi.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. # -*- coding: utf-8 -*-
  2. """
  3. .. program:: celery multi
  4. Examples
  5. ========
  6. .. code-block:: bash
  7. # Single worker with explicit name and events enabled.
  8. $ celery multi start Leslie -E
  9. # Pidfiles and logfiles are stored in the current directory
  10. # by default. Use --pidfile and --logfile argument to change
  11. # this. The abbreviation %N will be expanded to the current
  12. # node name.
  13. $ celery multi start Leslie -E --pidfile=/var/run/celery/%N.pid
  14. --logfile=/var/log/celery/%N.log
  15. # You need to add the same arguments when you restart,
  16. # as these are not persisted anywhere.
  17. $ celery multi restart Leslie -E --pidfile=/var/run/celery/%N.pid
  18. --logfile=/var/run/celery/%N.log
  19. # To stop the node, you need to specify the same pidfile.
  20. $ celery multi stop Leslie --pidfile=/var/run/celery/%N.pid
  21. # 3 workers, with 3 processes each
  22. $ celery multi start 3 -c 3
  23. celery worker -n celery1@myhost -c 3
  24. celery worker -n celery2@myhost -c 3
  25. celery worker -n celery3@myhost -c 3
  26. # start 3 named workers
  27. $ celery multi start image video data -c 3
  28. celery worker -n image@myhost -c 3
  29. celery worker -n video@myhost -c 3
  30. celery worker -n data@myhost -c 3
  31. # specify custom hostname
  32. $ celery multi start 2 --hostname=worker.example.com -c 3
  33. celery worker -n celery1@worker.example.com -c 3
  34. celery worker -n celery2@worker.example.com -c 3
  35. # specify fully qualified nodenames
  36. $ celery multi start foo@worker.example.com bar@worker.example.com -c 3
  37. # Advanced example starting 10 workers in the background:
  38. # * Three of the workers processes the images and video queue
  39. # * Two of the workers processes the data queue with loglevel DEBUG
  40. # * the rest processes the default' queue.
  41. $ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  42. -Q default -L:4,5 DEBUG
  43. # You can show the commands necessary to start the workers with
  44. # the 'show' command:
  45. $ celery multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  46. -Q default -L:4,5 DEBUG
  47. # Additional options are added to each celery worker' comamnd,
  48. # but you can also modify the options for ranges of, or specific workers
  49. # 3 workers: Two with 3 processes, and one with 10 processes.
  50. $ celery multi start 3 -c 3 -c:1 10
  51. celery worker -n celery1@myhost -c 10
  52. celery worker -n celery2@myhost -c 3
  53. celery worker -n celery3@myhost -c 3
  54. # can also specify options for named workers
  55. $ celery multi start image video data -c 3 -c:image 10
  56. celery worker -n image@myhost -c 10
  57. celery worker -n video@myhost -c 3
  58. celery worker -n data@myhost -c 3
  59. # ranges and lists of workers in options is also allowed:
  60. # (-c:1-3 can also be written as -c:1,2,3)
  61. $ celery multi start 5 -c 3 -c:1-3 10
  62. celery worker -n celery1@myhost -c 10
  63. celery worker -n celery2@myhost -c 10
  64. celery worker -n celery3@myhost -c 10
  65. celery worker -n celery4@myhost -c 3
  66. celery worker -n celery5@myhost -c 3
  67. # lists also works with named workers
  68. $ celery multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  69. celery worker -n foo@myhost -c 10
  70. celery worker -n bar@myhost -c 10
  71. celery worker -n baz@myhost -c 10
  72. celery worker -n xuzzy@myhost -c 3
  73. """
  74. from __future__ import absolute_import, print_function
  75. import errno
  76. import os
  77. import shlex
  78. import signal
  79. import socket
  80. import sys
  81. from collections import defaultdict
  82. from subprocess import Popen
  83. from time import sleep
  84. from kombu.utils import cached_property
  85. from kombu.utils.compat import OrderedDict
  86. from kombu.utils.encoding import from_utf8
  87. from celery import VERSION_BANNER
  88. from celery.five import items
  89. from celery.platforms import Pidfile, IS_WINDOWS
  90. from celery.utils import term, nodesplit
  91. from celery.utils.text import pluralize
  92. SIGNAMES = set(sig for sig in dir(signal)
  93. if sig.startswith('SIG') and '_' not in sig)
  94. SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)
  95. USAGE = """\
  96. usage: {prog_name} start <node1 node2 nodeN|range> [worker options]
  97. {prog_name} stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  98. {prog_name} restart <n1 n2 nN|range> [-SIG] [worker options]
  99. {prog_name} kill <n1 n2 nN|range>
  100. {prog_name} show <n1 n2 nN|range> [worker options]
  101. {prog_name} get hostname <n1 n2 nN|range> [-qv] [worker options]
  102. {prog_name} names <n1 n2 nN|range>
  103. {prog_name} expand template <n1 n2 nN|range>
  104. {prog_name} help
  105. additional options (must appear after command name):
  106. * --nosplash: Don't display program info.
  107. * --quiet: Don't show as much output.
  108. * --verbose: Show more output.
  109. * --no-color: Don't display colors.
  110. """
  111. def main():
  112. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  113. class MultiTool(object):
  114. retcode = 0 # Final exit code.
  115. def __init__(self, env=None, fh=None, quiet=False, verbose=False,
  116. no_color=False, nosplash=False):
  117. self.fh = fh or sys.stderr
  118. self.env = env
  119. self.nosplash = nosplash
  120. self.quiet = quiet
  121. self.verbose = verbose
  122. self.no_color = no_color
  123. self.prog_name = 'celery multi'
  124. self.commands = {'start': self.start,
  125. 'show': self.show,
  126. 'stop': self.stop,
  127. 'stopwait': self.stopwait,
  128. 'stop_verify': self.stopwait, # compat alias
  129. 'restart': self.restart,
  130. 'kill': self.kill,
  131. 'names': self.names,
  132. 'expand': self.expand,
  133. 'get': self.get,
  134. 'help': self.help}
  135. def execute_from_commandline(self, argv, cmd='celery worker'):
  136. argv = list(argv) # don't modify callers argv.
  137. # Reserve the --nosplash|--quiet|-q/--verbose options.
  138. if '--nosplash' in argv:
  139. self.nosplash = argv.pop(argv.index('--nosplash'))
  140. if '--quiet' in argv:
  141. self.quiet = argv.pop(argv.index('--quiet'))
  142. if '-q' in argv:
  143. self.quiet = argv.pop(argv.index('-q'))
  144. if '--verbose' in argv:
  145. self.verbose = argv.pop(argv.index('--verbose'))
  146. if '--no-color' in argv:
  147. self.no_color = argv.pop(argv.index('--no-color'))
  148. self.prog_name = os.path.basename(argv.pop(0))
  149. if not argv or argv[0][0] == '-':
  150. return self.error()
  151. try:
  152. self.commands[argv[0]](argv[1:], cmd)
  153. except KeyError:
  154. self.error('Invalid command: {0}'.format(argv[0]))
  155. return self.retcode
  156. def say(self, m, newline=True):
  157. print(m, file=self.fh, end='\n' if newline else '')
  158. def names(self, argv, cmd):
  159. p = NamespacedOptionParser(argv)
  160. self.say('\n'.join(
  161. hostname for hostname, _, _ in multi_args(p, cmd)),
  162. )
  163. def get(self, argv, cmd):
  164. wanted = argv[0]
  165. p = NamespacedOptionParser(argv[1:])
  166. for name, worker, _ in multi_args(p, cmd):
  167. if name == wanted:
  168. self.say(' '.join(worker))
  169. return
  170. def show(self, argv, cmd):
  171. p = NamespacedOptionParser(argv)
  172. self.note('> Starting nodes...')
  173. self.say('\n'.join(
  174. ' '.join(worker) for _, worker, _ in multi_args(p, cmd)),
  175. )
  176. def start(self, argv, cmd):
  177. self.splash()
  178. p = NamespacedOptionParser(argv)
  179. self.with_detacher_default_options(p)
  180. retcodes = []
  181. self.note('> Starting nodes...')
  182. for nodename, argv, _ in multi_args(p, cmd):
  183. self.note('\t> {0}: '.format(nodename), newline=False)
  184. retcode = self.waitexec(argv)
  185. self.note(retcode and self.FAILED or self.OK)
  186. retcodes.append(retcode)
  187. self.retcode = int(any(retcodes))
  188. def with_detacher_default_options(self, p):
  189. p.options.setdefault('--pidfile', '%N.pid')
  190. p.options.setdefault('--logfile', '%N.log')
  191. p.options.setdefault('--cmd', '-m celery worker --detach')
  192. def signal_node(self, nodename, pid, sig):
  193. try:
  194. os.kill(pid, sig)
  195. except OSError as exc:
  196. if exc.errno != errno.ESRCH:
  197. raise
  198. self.note('Could not signal {0} ({1}): No such process'.format(
  199. nodename, pid))
  200. return False
  201. return True
  202. def node_alive(self, pid):
  203. try:
  204. os.kill(pid, 0)
  205. except OSError as exc:
  206. if exc.errno == errno.ESRCH:
  207. return False
  208. raise
  209. return True
  210. def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
  211. callback=None):
  212. if not nodes:
  213. return
  214. P = set(nodes)
  215. def on_down(node):
  216. P.discard(node)
  217. if callback:
  218. callback(*node)
  219. self.note(self.colored.blue('> Stopping nodes...'))
  220. for node in list(P):
  221. if node in P:
  222. nodename, _, pid = node
  223. self.note('\t> {0}: {1} -> {2}'.format(
  224. nodename, SIGMAP[sig][3:], pid))
  225. if not self.signal_node(nodename, pid, sig):
  226. on_down(node)
  227. def note_waiting():
  228. left = len(P)
  229. if left:
  230. pids = ', '.join(str(pid) for _, _, pid in P)
  231. self.note(self.colored.blue(
  232. '> Waiting for {0} {1} -> {2}...'.format(
  233. left, pluralize(left, 'node'), pids)), newline=False)
  234. if retry:
  235. note_waiting()
  236. its = 0
  237. while P:
  238. for node in P:
  239. its += 1
  240. self.note('.', newline=False)
  241. nodename, _, pid = node
  242. if not self.node_alive(pid):
  243. self.note('\n\t> {0}: {1}'.format(nodename, self.OK))
  244. on_down(node)
  245. note_waiting()
  246. break
  247. if P and not its % len(P):
  248. sleep(float(retry))
  249. self.note('')
  250. def getpids(self, p, cmd, callback=None):
  251. pidfile_template = p.options.setdefault('--pidfile', '%N.pid')
  252. nodes = []
  253. for nodename, argv, expander in multi_args(p, cmd):
  254. pid = None
  255. pidfile = expander(pidfile_template)
  256. try:
  257. pid = Pidfile(pidfile).read_pid()
  258. except ValueError:
  259. pass
  260. if pid:
  261. nodes.append((nodename, tuple(argv), pid))
  262. else:
  263. self.note('> {0}: {1}'.format(nodename, self.DOWN))
  264. if callback:
  265. callback(nodename, argv, pid)
  266. return nodes
  267. def kill(self, argv, cmd):
  268. self.splash()
  269. p = NamespacedOptionParser(argv)
  270. for nodename, _, pid in self.getpids(p, cmd):
  271. self.note('Killing node {0} ({1})'.format(nodename, pid))
  272. self.signal_node(nodename, pid, signal.SIGKILL)
  273. def stop(self, argv, cmd, retry=None, callback=None):
  274. self.splash()
  275. p = NamespacedOptionParser(argv)
  276. return self._stop_nodes(p, cmd, retry=retry, callback=callback)
  277. def _stop_nodes(self, p, cmd, retry=None, callback=None):
  278. restargs = p.args[len(p.values):]
  279. self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
  280. sig=findsig(restargs),
  281. retry=retry,
  282. callback=callback)
  283. def restart(self, argv, cmd):
  284. self.splash()
  285. p = NamespacedOptionParser(argv)
  286. self.with_detacher_default_options(p)
  287. retvals = []
  288. def on_node_shutdown(nodename, argv, pid):
  289. self.note(self.colored.blue(
  290. '> Restarting node {0}: '.format(nodename)), newline=False)
  291. retval = self.waitexec(argv)
  292. self.note(retval and self.FAILED or self.OK)
  293. retvals.append(retval)
  294. self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
  295. self.retval = int(any(retvals))
  296. def stopwait(self, argv, cmd):
  297. self.splash()
  298. p = NamespacedOptionParser(argv)
  299. self.with_detacher_default_options(p)
  300. return self._stop_nodes(p, cmd, retry=2)
  301. stop_verify = stopwait # compat
  302. def expand(self, argv, cmd=None):
  303. template = argv[0]
  304. p = NamespacedOptionParser(argv[1:])
  305. for _, _, expander in multi_args(p, cmd):
  306. self.say(expander(template))
  307. def help(self, argv, cmd=None):
  308. self.say(__doc__)
  309. def usage(self):
  310. self.splash()
  311. self.say(USAGE.format(prog_name=self.prog_name))
  312. def splash(self):
  313. if not self.nosplash:
  314. c = self.colored
  315. self.note(c.cyan('celery multi v{0}'.format(VERSION_BANNER)))
  316. def waitexec(self, argv, path=sys.executable):
  317. args = ' '.join([path] + list(argv))
  318. argstr = shlex.split(from_utf8(args), posix=not IS_WINDOWS)
  319. pipe = Popen(argstr, env=self.env)
  320. self.info(' {0}'.format(' '.join(argstr)))
  321. retcode = pipe.wait()
  322. if retcode < 0:
  323. self.note('* Child was terminated by signal {0}'.format(-retcode))
  324. return -retcode
  325. elif retcode > 0:
  326. self.note('* Child terminated with errorcode {0}'.format(retcode))
  327. return retcode
  328. def error(self, msg=None):
  329. if msg:
  330. self.say(msg)
  331. self.usage()
  332. self.retcode = 1
  333. return 1
  334. def info(self, msg, newline=True):
  335. if self.verbose:
  336. self.note(msg, newline=newline)
  337. def note(self, msg, newline=True):
  338. if not self.quiet:
  339. self.say(str(msg), newline=newline)
  340. @cached_property
  341. def colored(self):
  342. return term.colored(enabled=not self.no_color)
  343. @cached_property
  344. def OK(self):
  345. return str(self.colored.green('OK'))
  346. @cached_property
  347. def FAILED(self):
  348. return str(self.colored.red('FAILED'))
  349. @cached_property
  350. def DOWN(self):
  351. return str(self.colored.magenta('DOWN'))
  352. def multi_args(p, cmd='celery worker', append='', prefix='', suffix=''):
  353. names = p.values
  354. options = dict(p.options)
  355. passthrough = p.passthrough
  356. ranges = len(names) == 1
  357. if ranges:
  358. try:
  359. noderange = int(names[0])
  360. except ValueError:
  361. pass
  362. else:
  363. names = [str(n) for n in range(1, noderange + 1)]
  364. prefix = 'celery'
  365. cmd = options.pop('--cmd', cmd)
  366. append = options.pop('--append', append)
  367. hostname = options.pop('--hostname',
  368. options.pop('-n', socket.gethostname()))
  369. prefix = options.pop('--prefix', prefix) or ''
  370. suffix = options.pop('--suffix', suffix) or hostname
  371. if suffix in ('""', "''"):
  372. suffix = ''
  373. for ns_name, ns_opts in list(items(p.namespaces)):
  374. if ',' in ns_name or (ranges and '-' in ns_name):
  375. for subns in parse_ns_range(ns_name, ranges):
  376. p.namespaces[subns].update(ns_opts)
  377. p.namespaces.pop(ns_name)
  378. for name in names:
  379. this_suffix = suffix
  380. if '@' in name:
  381. this_name = options['-n'] = name
  382. nodename, this_suffix = nodesplit(name)
  383. name = nodename
  384. else:
  385. nodename = '%s%s' % (prefix, name)
  386. this_name = options['-n'] = '%s@%s' % (nodename, this_suffix)
  387. expand = abbreviations({'%h': this_name,
  388. '%n': name,
  389. '%N': nodename,
  390. '%d': this_suffix})
  391. argv = ([expand(cmd)] +
  392. [format_opt(opt, expand(value))
  393. for opt, value in items(p.optmerge(name, options))] +
  394. [passthrough])
  395. if append:
  396. argv.append(expand(append))
  397. yield this_name, argv, expand
  398. class NamespacedOptionParser(object):
  399. def __init__(self, args):
  400. self.args = args
  401. self.options = OrderedDict()
  402. self.values = []
  403. self.passthrough = ''
  404. self.namespaces = defaultdict(lambda: OrderedDict())
  405. self.parse()
  406. def parse(self):
  407. rargs = list(self.args)
  408. pos = 0
  409. while pos < len(rargs):
  410. arg = rargs[pos]
  411. if arg == '--':
  412. self.passthrough = ' '.join(rargs[pos:])
  413. break
  414. elif arg[0] == '-':
  415. if arg[1] == '-':
  416. self.process_long_opt(arg[2:])
  417. else:
  418. value = None
  419. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  420. value = rargs[pos + 1]
  421. pos += 1
  422. self.process_short_opt(arg[1:], value)
  423. else:
  424. self.values.append(arg)
  425. pos += 1
  426. def process_long_opt(self, arg, value=None):
  427. if '=' in arg:
  428. arg, value = arg.split('=', 1)
  429. self.add_option(arg, value, short=False)
  430. def process_short_opt(self, arg, value=None):
  431. self.add_option(arg, value, short=True)
  432. def optmerge(self, ns, defaults=None):
  433. if defaults is None:
  434. defaults = self.options
  435. return OrderedDict(defaults, **self.namespaces[ns])
  436. def add_option(self, name, value, short=False, ns=None):
  437. prefix = short and '-' or '--'
  438. dest = self.options
  439. if ':' in name:
  440. name, ns = name.split(':')
  441. dest = self.namespaces[ns]
  442. dest[prefix + name] = value
  443. def quote(v):
  444. return "\\'".join("'" + p + "'" for p in v.split("'"))
  445. def format_opt(opt, value):
  446. if not value:
  447. return opt
  448. if opt.startswith('--'):
  449. return '{0}={1}'.format(opt, value)
  450. return '{0} {1}'.format(opt, value)
  451. def parse_ns_range(ns, ranges=False):
  452. ret = []
  453. for space in ',' in ns and ns.split(',') or [ns]:
  454. if ranges and '-' in space:
  455. start, stop = space.split('-')
  456. ret.extend(
  457. str(n) for n in range(int(start), int(stop) + 1)
  458. )
  459. else:
  460. ret.append(space)
  461. return ret
  462. def abbreviations(mapping):
  463. def expand(S):
  464. ret = S
  465. if S is not None:
  466. for short_opt, long_opt in items(mapping):
  467. ret = ret.replace(short_opt, long_opt)
  468. return ret
  469. return expand
  470. def findsig(args, default=signal.SIGTERM):
  471. for arg in reversed(args):
  472. if len(arg) == 2 and arg[0] == '-':
  473. try:
  474. return int(arg[1])
  475. except ValueError:
  476. pass
  477. if arg[0] == '-':
  478. maybe_sig = 'SIG' + arg[1:]
  479. if maybe_sig in SIGNAMES:
  480. return getattr(signal, maybe_sig)
  481. return default
  482. if __name__ == '__main__': # pragma: no cover
  483. main()