celeryd_multi.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. # -*- coding: utf-8 -*-
  2. """
  3. .. program:: celeryd-multi
  4. Examples
  5. ========
  6. .. code-block:: bash
  7. # Single worker with explicit name and events enabled.
  8. $ celeryd-multi start Leslie -E
  9. # Pidfiles and logfiles are stored in the current directory
  10. # by default. Use --pidfile and --logfile argument to change
  11. # this. The abbreviation %n will be expanded to the current
  12. # node name.
  13. $ celeryd-multi start Leslie -E --pidfile=/var/run/celery/%n.pid
  14. --logfile=/var/log/celery/%n.log
  15. # You need to add the same arguments when you restart,
  16. # as these are not persisted anywhere.
  17. $ celeryd-multi restart Leslie -E --pidfile=/var/run/celery/%n.pid
  18. --logfile=/var/run/celery/%n.log
  19. # To stop the node, you need to specify the same pidfile.
  20. $ celeryd-multi stop Leslie --pidfile=/var/run/celery/%n.pid
  21. # 3 workers, with 3 processes each
  22. $ celeryd-multi start 3 -c 3
  23. celeryd -n celeryd1.myhost -c 3
  24. celeryd -n celeryd2.myhost -c 3
  25. celeryd- n celeryd3.myhost -c 3
  26. # start 3 named workers
  27. $ celeryd-multi start image video data -c 3
  28. celeryd -n image.myhost -c 3
  29. celeryd -n video.myhost -c 3
  30. celeryd -n data.myhost -c 3
  31. # specify custom hostname
  32. $ celeryd-multi start 2 -n worker.example.com -c 3
  33. celeryd -n celeryd1.worker.example.com -c 3
  34. celeryd -n celeryd2.worker.example.com -c 3
  35. # Advanced example starting 10 workers in the background:
  36. # * Three of the workers processes the images and video queue
  37. # * Two of the workers processes the data queue with loglevel DEBUG
  38. # * the rest processes the default' queue.
  39. $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  40. -Q default -L:4,5 DEBUG
  41. # You can show the commands necessary to start the workers with
  42. # the 'show' command:
  43. $ celeryd-multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  44. -Q default -L:4,5 DEBUG
  45. # Additional options are added to each celeryd',
  46. # but you can also modify the options for ranges of, or specific workers
  47. # 3 workers: Two with 3 processes, and one with 10 processes.
  48. $ celeryd-multi start 3 -c 3 -c:1 10
  49. celeryd -n celeryd1.myhost -c 10
  50. celeryd -n celeryd2.myhost -c 3
  51. celeryd -n celeryd3.myhost -c 3
  52. # can also specify options for named workers
  53. $ celeryd-multi start image video data -c 3 -c:image 10
  54. celeryd -n image.myhost -c 10
  55. celeryd -n video.myhost -c 3
  56. celeryd -n data.myhost -c 3
  57. # ranges and lists of workers in options is also allowed:
  58. # (-c:1-3 can also be written as -c:1,2,3)
  59. $ celeryd-multi start 5 -c 3 -c:1-3 10
  60. celeryd -n celeryd1.myhost -c 10
  61. celeryd -n celeryd2.myhost -c 10
  62. celeryd -n celeryd3.myhost -c 10
  63. celeryd -n celeryd4.myhost -c 3
  64. celeryd -n celeryd5.myhost -c 3
  65. # lists also works with named workers
  66. $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  67. celeryd -n foo.myhost -c 10
  68. celeryd -n bar.myhost -c 10
  69. celeryd -n baz.myhost -c 10
  70. celeryd -n xuzzy.myhost -c 3
  71. """
  72. from __future__ import absolute_import
  73. import errno
  74. import os
  75. import signal
  76. import socket
  77. import sys
  78. from collections import defaultdict
  79. from subprocess import Popen
  80. from time import sleep
  81. from kombu.utils import cached_property
  82. from kombu.utils.encoding import from_utf8
  83. from celery import VERSION_BANNER
  84. from celery.platforms import Pidfile, shellsplit
  85. from celery.utils import term
  86. from celery.utils.text import pluralize
  87. SIGNAMES = set(sig for sig in dir(signal)
  88. if sig.startswith('SIG') and '_' not in sig)
  89. SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)
  90. USAGE = """\
  91. usage: %(prog_name)s start <node1 node2 nodeN|range> [celeryd options]
  92. %(prog_name)s stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  93. %(prog_name)s restart <n1 n2 nN|range> [-SIG] [celeryd options]
  94. %(prog_name)s kill <n1 n2 nN|range>
  95. %(prog_name)s show <n1 n2 nN|range> [celeryd options]
  96. %(prog_name)s get hostname <n1 n2 nN|range> [-qv] [celeryd options]
  97. %(prog_name)s names <n1 n2 nN|range>
  98. %(prog_name)s expand template <n1 n2 nN|range>
  99. %(prog_name)s help
  100. additional options (must appear after command name):
  101. * --nosplash: Don't display program info.
  102. * --quiet: Don't show as much output.
  103. * --verbose: Show more output.
  104. * --no-color: Don't display colors.
  105. """
  106. def main():
  107. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  108. class MultiTool(object):
  109. retcode = 0 # Final exit code.
  110. def __init__(self, env=None, fh=None, quiet=False, verbose=False,
  111. no_color=False, nosplash=False):
  112. self.fh = fh or sys.stderr
  113. self.env = env
  114. self.nosplash = nosplash
  115. self.quiet = quiet
  116. self.verbose = verbose
  117. self.no_color = no_color
  118. self.prog_name = 'celeryd-multi'
  119. self.commands = {'start': self.start,
  120. 'show': self.show,
  121. 'stop': self.stop,
  122. 'stopwait': self.stopwait,
  123. 'stop_verify': self.stopwait, # compat alias
  124. 'restart': self.restart,
  125. 'kill': self.kill,
  126. 'names': self.names,
  127. 'expand': self.expand,
  128. 'get': self.get,
  129. 'help': self.help}
  130. def execute_from_commandline(self, argv, cmd='celeryd'):
  131. argv = list(argv) # don't modify callers argv.
  132. # Reserve the --nosplash|--quiet|-q/--verbose options.
  133. if '--nosplash' in argv:
  134. self.nosplash = argv.pop(argv.index('--nosplash'))
  135. if '--quiet' in argv:
  136. self.quiet = argv.pop(argv.index('--quiet'))
  137. if '-q' in argv:
  138. self.quiet = argv.pop(argv.index('-q'))
  139. if '--verbose' in argv:
  140. self.verbose = argv.pop(argv.index('--verbose'))
  141. if '--no-color' in argv:
  142. self.no_color = argv.pop(argv.index('--no-color'))
  143. self.prog_name = os.path.basename(argv.pop(0))
  144. if not argv or argv[0][0] == '-':
  145. return self.error()
  146. try:
  147. self.commands[argv[0]](argv[1:], cmd)
  148. except KeyError:
  149. self.error('Invalid command: %s' % argv[0])
  150. return self.retcode
  151. def say(self, m, newline=True):
  152. self.fh.write('%s%s' % (m, '\n' if newline else ''))
  153. def names(self, argv, cmd):
  154. p = NamespacedOptionParser(argv)
  155. self.say('\n'.join(
  156. hostname for hostname, _, _ in multi_args(p, cmd)),
  157. )
  158. def get(self, argv, cmd):
  159. wanted = argv[0]
  160. p = NamespacedOptionParser(argv[1:])
  161. for name, worker, _ in multi_args(p, cmd):
  162. if name == wanted:
  163. self.say(' '.join(worker))
  164. return
  165. def show(self, argv, cmd):
  166. p = NamespacedOptionParser(argv)
  167. self.note('> Starting nodes...')
  168. self.say('\n'.join(
  169. ' '.join(worker) for _, worker, _ in multi_args(p, cmd)),
  170. )
  171. def start(self, argv, cmd):
  172. self.splash()
  173. p = NamespacedOptionParser(argv)
  174. self.with_detacher_default_options(p)
  175. retcodes = []
  176. self.note('> Starting nodes...')
  177. for nodename, argv, _ in multi_args(p, cmd):
  178. self.note('\t> %s: ' % (nodename, ), newline=False)
  179. retcode = self.waitexec(argv)
  180. self.note(retcode and self.FAILED or self.OK)
  181. retcodes.append(retcode)
  182. self.retcode = int(any(retcodes))
  183. def with_detacher_default_options(self, p):
  184. p.options.setdefault('--pidfile', 'celeryd@%n.pid')
  185. p.options.setdefault('--logfile', 'celeryd@%n.log')
  186. p.options.setdefault('--cmd', '-m celery.bin.celeryd_detach')
  187. def signal_node(self, nodename, pid, sig):
  188. try:
  189. os.kill(pid, sig)
  190. except OSError, exc:
  191. if exc.errno != errno.ESRCH:
  192. raise
  193. self.note('Could not signal %s (%s): No such process' % (
  194. nodename, pid))
  195. return False
  196. return True
  197. def node_alive(self, pid):
  198. try:
  199. os.kill(pid, 0)
  200. except OSError, exc:
  201. if exc.errno == errno.ESRCH:
  202. return False
  203. raise
  204. return True
  205. def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
  206. callback=None):
  207. if not nodes:
  208. return
  209. P = set(nodes)
  210. def on_down(node):
  211. P.discard(node)
  212. if callback:
  213. callback(*node)
  214. self.note(self.colored.blue('> Stopping nodes...'))
  215. for node in list(P):
  216. if node in P:
  217. nodename, _, pid = node
  218. self.note('\t> %s: %s -> %s' % (nodename,
  219. SIGMAP[sig][3:],
  220. pid))
  221. if not self.signal_node(nodename, pid, sig):
  222. on_down(node)
  223. def note_waiting():
  224. left = len(P)
  225. if left:
  226. pids = ', '.join(str(pid) for _, _, pid in P)
  227. self.note(self.colored.blue('> Waiting for %s %s -> %s...' % (
  228. left, pluralize(left, 'node'), pids)), newline=False)
  229. if retry:
  230. note_waiting()
  231. its = 0
  232. while P:
  233. for node in P:
  234. its += 1
  235. self.note('.', newline=False)
  236. nodename, _, pid = node
  237. if not self.node_alive(pid):
  238. self.note('\n\t> %s: %s' % (nodename, self.OK))
  239. on_down(node)
  240. note_waiting()
  241. break
  242. if P and not its % len(P):
  243. sleep(float(retry))
  244. self.note('')
  245. def getpids(self, p, cmd, callback=None):
  246. pidfile_template = p.options.setdefault('--pidfile', 'celeryd@%n.pid')
  247. nodes = []
  248. for nodename, argv, expander in multi_args(p, cmd):
  249. pid = None
  250. pidfile = expander(pidfile_template)
  251. try:
  252. pid = Pidfile(pidfile).read_pid()
  253. except ValueError:
  254. pass
  255. if pid:
  256. nodes.append((nodename, tuple(argv), pid))
  257. else:
  258. self.note('> %s: %s' % (nodename, self.DOWN))
  259. if callback:
  260. callback(nodename, argv, pid)
  261. return nodes
  262. def kill(self, argv, cmd):
  263. self.splash()
  264. p = NamespacedOptionParser(argv)
  265. for nodename, _, pid in self.getpids(p, cmd):
  266. self.note('Killing node %s (%s)' % (nodename, pid))
  267. self.signal_node(nodename, pid, signal.SIGKILL)
  268. def stop(self, argv, cmd, retry=None, callback=None):
  269. self.splash()
  270. p = NamespacedOptionParser(argv)
  271. return self._stop_nodes(p, cmd, retry=retry, callback=callback)
  272. def _stop_nodes(self, p, cmd, retry=None, callback=None):
  273. restargs = p.args[len(p.values):]
  274. self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
  275. sig=findsig(restargs),
  276. retry=retry,
  277. callback=callback)
  278. def restart(self, argv, cmd):
  279. self.splash()
  280. p = NamespacedOptionParser(argv)
  281. self.with_detacher_default_options(p)
  282. retvals = []
  283. def on_node_shutdown(nodename, argv, pid):
  284. self.note(self.colored.blue(
  285. '> Restarting node %s: ' % nodename), newline=False)
  286. retval = self.waitexec(argv)
  287. self.note(retval and self.FAILED or self.OK)
  288. retvals.append(retval)
  289. self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
  290. self.retval = int(any(retvals))
  291. def stopwait(self, argv, cmd):
  292. self.splash()
  293. p = NamespacedOptionParser(argv)
  294. self.with_detacher_default_options(p)
  295. return self._stop_nodes(p, cmd, retry=2)
  296. stop_verify = stopwait # compat
  297. def expand(self, argv, cmd=None):
  298. template = argv[0]
  299. p = NamespacedOptionParser(argv[1:])
  300. for _, _, expander in multi_args(p, cmd):
  301. self.say(expander(template))
  302. def help(self, argv, cmd=None):
  303. self.say(__doc__)
  304. def usage(self):
  305. self.splash()
  306. self.say(USAGE % {'prog_name': self.prog_name})
  307. def splash(self):
  308. if not self.nosplash:
  309. c = self.colored
  310. self.note(c.cyan('celeryd-multi v%s' % VERSION_BANNER))
  311. def waitexec(self, argv, path=sys.executable):
  312. args = ' '.join([path] + list(argv))
  313. argstr = shellsplit(from_utf8(args))
  314. pipe = Popen(argstr, env=self.env)
  315. self.info(' %s' % ' '.join(argstr))
  316. retcode = pipe.wait()
  317. if retcode < 0:
  318. self.note('* Child was terminated by signal %s' % (-retcode, ))
  319. return -retcode
  320. elif retcode > 0:
  321. self.note('* Child terminated with failure code %s' % (retcode, ))
  322. return retcode
  323. def error(self, msg=None):
  324. if msg:
  325. self.say(msg)
  326. self.usage()
  327. self.retcode = 1
  328. return 1
  329. def info(self, msg, newline=True):
  330. if self.verbose:
  331. self.note(msg, newline=newline)
  332. def note(self, msg, newline=True):
  333. if not self.quiet:
  334. self.say(str(msg), newline=newline)
  335. @cached_property
  336. def colored(self):
  337. return term.colored(enabled=not self.no_color)
  338. @cached_property
  339. def OK(self):
  340. return str(self.colored.green('OK'))
  341. @cached_property
  342. def FAILED(self):
  343. return str(self.colored.red('FAILED'))
  344. @cached_property
  345. def DOWN(self):
  346. return str(self.colored.magenta('DOWN'))
  347. def multi_args(p, cmd='celeryd', append='', prefix='', suffix=''):
  348. names = p.values
  349. options = dict(p.options)
  350. passthrough = p.passthrough
  351. ranges = len(names) == 1
  352. if ranges:
  353. try:
  354. noderange = int(names[0])
  355. except ValueError:
  356. pass
  357. else:
  358. names = [str(v) for v in range(1, noderange + 1)]
  359. prefix = 'celery'
  360. cmd = options.pop('--cmd', cmd)
  361. append = options.pop('--append', append)
  362. hostname = options.pop('--hostname',
  363. options.pop('-n', socket.gethostname()))
  364. prefix = options.pop('--prefix', prefix) or ''
  365. suffix = options.pop('--suffix', suffix) or '.' + hostname
  366. if suffix in ('""', "''"):
  367. suffix = ''
  368. for ns_name, ns_opts in p.namespaces.items():
  369. if ',' in ns_name or (ranges and '-' in ns_name):
  370. for subns in parse_ns_range(ns_name, ranges):
  371. p.namespaces[subns].update(ns_opts)
  372. p.namespaces.pop(ns_name)
  373. for name in names:
  374. this_name = options['-n'] = prefix + name + suffix
  375. expand = abbreviations({'%h': this_name,
  376. '%n': name})
  377. argv = ([expand(cmd)] +
  378. [format_opt(opt, expand(value))
  379. for opt, value in p.optmerge(name, options).items()] +
  380. [passthrough])
  381. if append:
  382. argv.append(expand(append))
  383. yield this_name, argv, expand
  384. class NamespacedOptionParser(object):
  385. def __init__(self, args):
  386. self.args = args
  387. self.options = {}
  388. self.values = []
  389. self.passthrough = ''
  390. self.namespaces = defaultdict(lambda: {})
  391. self.parse()
  392. def parse(self):
  393. rargs = list(self.args)
  394. pos = 0
  395. while pos < len(rargs):
  396. arg = rargs[pos]
  397. if arg == '--':
  398. self.passthrough = ' '.join(rargs[pos:])
  399. break
  400. elif arg[0] == '-':
  401. if arg[1] == '-':
  402. self.process_long_opt(arg[2:])
  403. else:
  404. value = None
  405. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  406. value = rargs[pos + 1]
  407. pos += 1
  408. self.process_short_opt(arg[1:], value)
  409. else:
  410. self.values.append(arg)
  411. pos += 1
  412. def process_long_opt(self, arg, value=None):
  413. if '=' in arg:
  414. arg, value = arg.split('=', 1)
  415. self.add_option(arg, value, short=False)
  416. def process_short_opt(self, arg, value=None):
  417. self.add_option(arg, value, short=True)
  418. def optmerge(self, ns, defaults=None):
  419. if defaults is None:
  420. defaults = self.options
  421. return dict(defaults, **self.namespaces[ns])
  422. def add_option(self, name, value, short=False, ns=None):
  423. prefix = short and '-' or '--'
  424. dest = self.options
  425. if ':' in name:
  426. name, ns = name.split(':')
  427. dest = self.namespaces[ns]
  428. dest[prefix + name] = value
  429. def quote(v):
  430. return "\\'".join("'" + p + "'" for p in v.split("'"))
  431. def format_opt(opt, value):
  432. if not value:
  433. return opt
  434. if opt.startswith('--'):
  435. return '%s=%s' % (opt, value)
  436. return '%s %s' % (opt, value)
  437. def parse_ns_range(ns, ranges=False):
  438. ret = []
  439. for space in ',' in ns and ns.split(',') or [ns]:
  440. if ranges and '-' in space:
  441. start, stop = space.split('-')
  442. x = [str(v) for v in range(int(start), int(stop) + 1)]
  443. ret.extend(x)
  444. else:
  445. ret.append(space)
  446. return ret
  447. def abbreviations(mapping):
  448. def expand(S):
  449. ret = S
  450. if S is not None:
  451. for short, long in mapping.items():
  452. ret = ret.replace(short, long)
  453. return ret
  454. return expand
  455. def findsig(args, default=signal.SIGTERM):
  456. for arg in reversed(args):
  457. if len(arg) == 2 and arg[0] == '-':
  458. try:
  459. return int(arg[1])
  460. except ValueError:
  461. pass
  462. if arg[0] == '-':
  463. maybe_sig = 'SIG' + arg[1:]
  464. if maybe_sig in SIGNAMES:
  465. return getattr(signal, maybe_sig)
  466. return default
  467. if __name__ == '__main__': # pragma: no cover
  468. main()