celeryd_multi.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. # -*- coding: utf-8 -*-
  2. """
  3. .. program:: celeryd-multi
  4. Examples
  5. ========
  6. ::
  7. # Single worker with explicit name and events enabled.
  8. $ celeryd-multi start Leslie -E
  9. # Pidfiles and logfiles are stored in the current directory
  10. # by default. Use --pidfile and --logfile argument to change
  11. # this. The abbreviation %n will be expanded to the current
  12. # node name.
  13. $ celeryd-multi start Leslie -E --pidfile=/var/run/celery/%n.pid
  14. --logfile=/var/log/celery/%n.log
  15. # You need to add the same arguments when you restart,
  16. # as these are not persisted anywhere.
  17. $ celeryd-multi restart Leslie -E --pidfile=/var/run/celery/%n.pid
  18. --logfile=/var/run/celery/%n.log
  19. # To stop the node, you need to specify the same pidfile.
  20. $ celeryd-multi stop Leslie --pidfile=/var/run/celery/%n.pid
  21. # 3 workers, with 3 processes each
  22. $ celeryd-multi start 3 -c 3
  23. celeryd -n celeryd1.myhost -c 3
  24. celeryd -n celeryd2.myhost -c 3
  25. celeryd- n celeryd3.myhost -c 3
  26. # start 3 named workers
  27. $ celeryd-multi start image video data -c 3
  28. celeryd -n image.myhost -c 3
  29. celeryd -n video.myhost -c 3
  30. celeryd -n data.myhost -c 3
  31. # specify custom hostname
  32. $ celeryd-multi start 2 -n worker.example.com -c 3
  33. celeryd -n celeryd1.worker.example.com -c 3
  34. celeryd -n celeryd2.worker.example.com -c 3
  35. # Advanced example starting 10 workers in the background:
  36. # * Three of the workers processes the images and video queue
  37. # * Two of the workers processes the data queue with loglevel DEBUG
  38. # * the rest processes the default' queue.
  39. $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  40. -Q default -L:4,5 DEBUG
  41. # You can show the commands necessary to start the workers with
  42. # the 'show' command:
  43. $ celeryd-multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  44. -Q default -L:4,5 DEBUG
  45. # Additional options are added to each celeryd',
  46. # but you can also modify the options for ranges of, or specific workers
  47. # 3 workers: Two with 3 processes, and one with 10 processes.
  48. $ celeryd-multi start 3 -c 3 -c:1 10
  49. celeryd -n celeryd1.myhost -c 10
  50. celeryd -n celeryd2.myhost -c 3
  51. celeryd -n celeryd3.myhost -c 3
  52. # can also specify options for named workers
  53. $ celeryd-multi start image video data -c 3 -c:image 10
  54. celeryd -n image.myhost -c 10
  55. celeryd -n video.myhost -c 3
  56. celeryd -n data.myhost -c 3
  57. # ranges and lists of workers in options is also allowed:
  58. # (-c:1-3 can also be written as -c:1,2,3)
  59. $ celeryd-multi start 5 -c 3 -c:1-3 10
  60. celeryd -n celeryd1.myhost -c 10
  61. celeryd -n celeryd2.myhost -c 10
  62. celeryd -n celeryd3.myhost -c 10
  63. celeryd -n celeryd4.myhost -c 3
  64. celeryd -n celeryd5.myhost -c 3
  65. # lists also works with named workers
  66. $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  67. celeryd -n foo.myhost -c 10
  68. celeryd -n bar.myhost -c 10
  69. celeryd -n baz.myhost -c 10
  70. celeryd -n xuzzy.myhost -c 3
  71. """
  72. from __future__ import absolute_import, print_function
  73. import errno
  74. import os
  75. import signal
  76. import socket
  77. import sys
  78. from collections import defaultdict
  79. from subprocess import Popen
  80. from time import sleep
  81. from kombu.utils import cached_property
  82. from kombu.utils.encoding import from_utf8
  83. from celery import VERSION_BANNER
  84. from celery.platforms import PIDFile, shellsplit
  85. from celery.utils import term
  86. from celery.utils.text import pluralize
  87. SIGNAMES = set(sig for sig in dir(signal)
  88. if sig.startswith('SIG') and '_' not in sig)
  89. SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)
  90. USAGE = """\
  91. usage: {prog_name} start <node1 node2 nodeN|range> [celeryd options]
  92. {prog_name} stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  93. {prog_name} restart <n1 n2 nN|range> [-SIG] [celeryd options]
  94. {prog_name} kill <n1 n2 nN|range>
  95. {prog_name} show <n1 n2 nN|range> [celeryd options]
  96. {prog_name} get hostname <n1 n2 nN|range> [-qv] [celeryd options]
  97. {prog_name} names <n1 n2 nN|range>
  98. {prog_name} expand template <n1 n2 nN|range>
  99. {prog_name} help
  100. additional options (must appear after command name):
  101. * --nosplash: Don't display program info.
  102. * --quiet: Don't show as much output.
  103. * --verbose: Show more output.
  104. * --no-color: Don't display colors.
  105. """
  106. def main():
  107. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  108. class MultiTool(object):
  109. retcode = 0 # Final exit code.
  110. def __init__(self, env=None, fh=None, quiet=False, verbose=False,
  111. no_color=False, nosplash=False):
  112. self.fh = fh or sys.stderr
  113. self.env = env
  114. self.nosplash = nosplash
  115. self.quiet = quiet
  116. self.verbose = verbose
  117. self.no_color = no_color
  118. self.prog_name = 'celeryd-multi'
  119. self.commands = {'start': self.start,
  120. 'show': self.show,
  121. 'stop': self.stop,
  122. 'stop_verify': self.stop_verify,
  123. 'restart': self.restart,
  124. 'kill': self.kill,
  125. 'names': self.names,
  126. 'expand': self.expand,
  127. 'get': self.get,
  128. 'help': self.help}
  129. def execute_from_commandline(self, argv, cmd='celeryd'):
  130. argv = list(argv) # don't modify callers argv.
  131. # Reserve the --nosplash|--quiet|-q/--verbose options.
  132. if '--nosplash' in argv:
  133. self.nosplash = argv.pop(argv.index('--nosplash'))
  134. if '--quiet' in argv:
  135. self.quiet = argv.pop(argv.index('--quiet'))
  136. if '-q' in argv:
  137. self.quiet = argv.pop(argv.index('-q'))
  138. if '--verbose' in argv:
  139. self.verbose = argv.pop(argv.index('--verbose'))
  140. if '--no-color' in argv:
  141. self.no_color = argv.pop(argv.index('--no-color'))
  142. self.prog_name = os.path.basename(argv.pop(0))
  143. if not argv or argv[0][0] == '-':
  144. return self.error()
  145. try:
  146. self.commands[argv[0]](argv[1:], cmd)
  147. except KeyError:
  148. self.error('Invalid command: {0}'.format(argv[0]))
  149. return self.retcode
  150. def say(self, m, newline=True):
  151. print(m, file=self.fh, end='\n' if newline else '')
  152. def names(self, argv, cmd):
  153. p = NamespacedOptionParser(argv)
  154. self.say('\n'.join(hostname
  155. for hostname, _, _ in multi_args(p, cmd)))
  156. def get(self, argv, cmd):
  157. wanted = argv[0]
  158. p = NamespacedOptionParser(argv[1:])
  159. for name, worker, _ in multi_args(p, cmd):
  160. if name == wanted:
  161. self.say(' '.join(worker))
  162. return
  163. def show(self, argv, cmd):
  164. p = NamespacedOptionParser(argv)
  165. self.note('> Starting nodes...')
  166. self.say('\n'.join(' '.join(worker)
  167. for _, worker, _ in multi_args(p, cmd)))
  168. def start(self, argv, cmd):
  169. self.splash()
  170. p = NamespacedOptionParser(argv)
  171. self.with_detacher_default_options(p)
  172. retcodes = []
  173. self.note('> Starting nodes...')
  174. for nodename, argv, _ in multi_args(p, cmd):
  175. self.note('\t> {0}: '.format(nodename), newline=False)
  176. retcode = self.waitexec(argv)
  177. self.note(retcode and self.FAILED or self.OK)
  178. retcodes.append(retcode)
  179. self.retcode = int(any(retcodes))
  180. def with_detacher_default_options(self, p):
  181. p.options.setdefault('--pidfile', 'celeryd@%n.pid')
  182. p.options.setdefault('--logfile', 'celeryd@%n.log')
  183. p.options.setdefault('--cmd', '-m celery.bin.celeryd_detach')
  184. def signal_node(self, nodename, pid, sig):
  185. try:
  186. os.kill(pid, sig)
  187. except OSError as exc:
  188. if exc.errno != errno.ESRCH:
  189. raise
  190. self.note('Could not signal {0} ({1}): No such process'.format(
  191. nodename, pid))
  192. return False
  193. return True
  194. def node_alive(self, pid):
  195. try:
  196. os.kill(pid, 0)
  197. except OSError as exc:
  198. if exc.errno == errno.ESRCH:
  199. return False
  200. raise
  201. return True
  202. def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
  203. callback=None):
  204. if not nodes:
  205. return
  206. P = set(nodes)
  207. def on_down(node):
  208. P.discard(node)
  209. if callback:
  210. callback(*node)
  211. self.note(self.colored.blue('> Stopping nodes...'))
  212. for node in list(P):
  213. if node in P:
  214. nodename, _, pid = node
  215. self.note('\t> {0}: {1} -> {2}'.format(
  216. nodename, SIGMAP[sig][3:], pid))
  217. if not self.signal_node(nodename, pid, sig):
  218. on_down(node)
  219. def note_waiting():
  220. left = len(P)
  221. if left:
  222. self.note(self.colored.blue('> Waiting for {0} {1}...'.format(
  223. left, pluralize(left, 'node'))), newline=False)
  224. if retry:
  225. note_waiting()
  226. its = 0
  227. while P:
  228. for node in P:
  229. its += 1
  230. self.note('.', newline=False)
  231. nodename, _, pid = node
  232. if not self.node_alive(pid):
  233. self.note('\n\t> {0}: {1}'.format(nodename, self.OK))
  234. on_down(node)
  235. note_waiting()
  236. break
  237. if P and not its % len(P):
  238. sleep(float(retry))
  239. self.note('')
  240. def getpids(self, p, cmd, callback=None):
  241. pidfile_template = p.options.setdefault('--pidfile', 'celeryd@%n.pid')
  242. nodes = []
  243. for nodename, argv, expander in multi_args(p, cmd):
  244. pid = None
  245. pidfile = expander(pidfile_template)
  246. try:
  247. pid = PIDFile(pidfile).read_pid()
  248. except ValueError:
  249. pass
  250. if pid:
  251. nodes.append((nodename, tuple(argv), pid))
  252. else:
  253. self.note('> {0}: {1}'.format(nodename, self.DOWN))
  254. if callback:
  255. callback(nodename, argv, pid)
  256. return nodes
  257. def kill(self, argv, cmd):
  258. self.splash()
  259. p = NamespacedOptionParser(argv)
  260. for nodename, _, pid in self.getpids(p, cmd):
  261. self.note('Killing node {0} ({1})'.format(nodename, pid))
  262. self.signal_node(nodename, pid, signal.SIGKILL)
  263. def stop(self, argv, cmd, retry=None, callback=None):
  264. self.splash()
  265. p = NamespacedOptionParser(argv)
  266. return self._stop_nodes(p, cmd, retry=retry, callback=callback)
  267. def _stop_nodes(self, p, cmd, retry=None, callback=None):
  268. restargs = p.args[len(p.values):]
  269. self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
  270. sig=findsig(restargs),
  271. retry=retry,
  272. callback=callback)
  273. def restart(self, argv, cmd):
  274. self.splash()
  275. p = NamespacedOptionParser(argv)
  276. self.with_detacher_default_options(p)
  277. retvals = []
  278. def on_node_shutdown(nodename, argv, pid):
  279. self.note(self.colored.blue(
  280. '> Restarting node {0}: '.format(nodename)), newline=False)
  281. retval = self.waitexec(argv)
  282. self.note(retval and self.FAILED or self.OK)
  283. retvals.append(retval)
  284. self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
  285. self.retval = int(any(retvals))
  286. def stop_verify(self, argv, cmd):
  287. self.splash()
  288. p = NamespacedOptionParser(argv)
  289. self.with_detacher_default_options(p)
  290. return self._stop_nodes(p, cmd, retry=2)
  291. def expand(self, argv, cmd=None):
  292. template = argv[0]
  293. p = NamespacedOptionParser(argv[1:])
  294. for _, _, expander in multi_args(p, cmd):
  295. self.say(expander(template))
  296. def help(self, argv, cmd=None):
  297. self.say(__doc__)
  298. def usage(self):
  299. self.splash()
  300. self.say(USAGE.format(prog_name=self.prog_name))
  301. def splash(self):
  302. if not self.nosplash:
  303. c = self.colored
  304. self.note(c.cyan('celeryd-multi v{0}'.format(VERSION_BANNER)))
  305. def waitexec(self, argv, path=sys.executable):
  306. args = ' '.join([path] + list(argv))
  307. argstr = shellsplit(from_utf8(args))
  308. pipe = Popen(argstr, env=self.env)
  309. self.info(' {0}'.format(' '.join(argstr)))
  310. retcode = pipe.wait()
  311. if retcode < 0:
  312. self.note('* Child was terminated by signal {0}'.format(-retcode))
  313. return -retcode
  314. elif retcode > 0:
  315. self.note('* Child terminated with errorcode {0}'.format(retcode))
  316. return retcode
  317. def error(self, msg=None):
  318. if msg:
  319. self.say(msg)
  320. self.usage()
  321. self.retcode = 1
  322. return 1
  323. def info(self, msg, newline=True):
  324. if self.verbose:
  325. self.note(msg, newline=newline)
  326. def note(self, msg, newline=True):
  327. if not self.quiet:
  328. self.say(str(msg), newline=newline)
  329. @cached_property
  330. def colored(self):
  331. return term.colored(enabled=not self.no_color)
  332. @cached_property
  333. def OK(self):
  334. return str(self.colored.green('OK'))
  335. @cached_property
  336. def FAILED(self):
  337. return str(self.colored.red('FAILED'))
  338. @cached_property
  339. def DOWN(self):
  340. return str(self.colored.magenta('DOWN'))
  341. def multi_args(p, cmd='celeryd', append='', prefix='', suffix=''):
  342. names = p.values
  343. options = dict(p.options)
  344. passthrough = p.passthrough
  345. ranges = len(names) == 1
  346. if ranges:
  347. try:
  348. noderange = int(names[0])
  349. except ValueError:
  350. pass
  351. else:
  352. names = map(str, range(1, noderange + 1))
  353. prefix = 'celery'
  354. cmd = options.pop('--cmd', cmd)
  355. append = options.pop('--append', append)
  356. hostname = options.pop('--hostname',
  357. options.pop('-n', socket.gethostname()))
  358. prefix = options.pop('--prefix', prefix) or ''
  359. suffix = options.pop('--suffix', suffix) or '.' + hostname
  360. if suffix in ('""', "''"):
  361. suffix = ''
  362. for ns_name, ns_opts in p.namespaces.items():
  363. if ',' in ns_name or (ranges and '-' in ns_name):
  364. for subns in parse_ns_range(ns_name, ranges):
  365. p.namespaces[subns].update(ns_opts)
  366. p.namespaces.pop(ns_name)
  367. for name in names:
  368. this_name = options['-n'] = prefix + name + suffix
  369. expand = abbreviations({'%h': this_name,
  370. '%n': name})
  371. argv = ([expand(cmd)] +
  372. [format_opt(opt, expand(value))
  373. for opt, value in p.optmerge(name, options).items()] +
  374. [passthrough])
  375. if append:
  376. argv.append(expand(append))
  377. yield this_name, argv, expand
  378. class NamespacedOptionParser(object):
  379. def __init__(self, args):
  380. self.args = args
  381. self.options = {}
  382. self.values = []
  383. self.passthrough = ''
  384. self.namespaces = defaultdict(lambda: {})
  385. self.parse()
  386. def parse(self):
  387. rargs = list(self.args)
  388. pos = 0
  389. while pos < len(rargs):
  390. arg = rargs[pos]
  391. if arg == '--':
  392. self.passthrough = ' '.join(rargs[pos:])
  393. break
  394. elif arg[0] == '-':
  395. if arg[1] == '-':
  396. self.process_long_opt(arg[2:])
  397. else:
  398. value = None
  399. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  400. value = rargs[pos + 1]
  401. pos += 1
  402. self.process_short_opt(arg[1:], value)
  403. else:
  404. self.values.append(arg)
  405. pos += 1
  406. def process_long_opt(self, arg, value=None):
  407. if '=' in arg:
  408. arg, value = arg.split('=', 1)
  409. self.add_option(arg, value, short=False)
  410. def process_short_opt(self, arg, value=None):
  411. self.add_option(arg, value, short=True)
  412. def optmerge(self, ns, defaults=None):
  413. if defaults is None:
  414. defaults = self.options
  415. return dict(defaults, **self.namespaces[ns])
  416. def add_option(self, name, value, short=False, ns=None):
  417. prefix = short and '-' or '--'
  418. dest = self.options
  419. if ':' in name:
  420. name, ns = name.split(':')
  421. dest = self.namespaces[ns]
  422. dest[prefix + name] = value
  423. def quote(v):
  424. return "\\'".join("'" + p + "'" for p in v.split("'"))
  425. def format_opt(opt, value):
  426. if not value:
  427. return opt
  428. if opt.startswith('--'):
  429. return '{0}={1}'.format(opt, value)
  430. return '{0} {1}'.format(opt, value)
  431. def parse_ns_range(ns, ranges=False):
  432. ret = []
  433. for space in ',' in ns and ns.split(',') or [ns]:
  434. if ranges and '-' in space:
  435. start, stop = space.split('-')
  436. x = map(str, range(int(start), int(stop) + 1))
  437. ret.extend(x)
  438. else:
  439. ret.append(space)
  440. return ret
  441. def abbreviations(map):
  442. def expand(S):
  443. ret = S
  444. if S is not None:
  445. for short, long in map.items():
  446. ret = ret.replace(short, long)
  447. return ret
  448. return expand
  449. def findsig(args, default=signal.SIGTERM):
  450. for arg in reversed(args):
  451. if len(arg) == 2 and arg[0] == '-':
  452. try:
  453. return int(arg[1])
  454. except ValueError:
  455. pass
  456. if arg[0] == '-':
  457. maybe_sig = 'SIG' + arg[1:]
  458. if maybe_sig in SIGNAMES:
  459. return getattr(signal, maybe_sig)
  460. return default
  461. if __name__ == '__main__': # pragma: no cover
  462. main()