multi.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. # -*- coding: utf-8 -*-
  2. """Start multiple worker instances from the command-line.
  3. .. program:: celery multi
  4. Examples
  5. ========
  6. .. code-block:: console
  7. $ # Single worker with explicit name and events enabled.
  8. $ celery multi start Leslie -E
  9. $ # Pidfiles and logfiles are stored in the current directory
  10. $ # by default. Use --pidfile and --logfile argument to change
  11. $ # this. The abbreviation %n will be expanded to the current
  12. $ # node name.
  13. $ celery multi start Leslie -E --pidfile=/var/run/celery/%n.pid
  14. --logfile=/var/log/celery/%n%I.log
  15. $ # You need to add the same arguments when you restart,
  16. $ # as these aren't persisted anywhere.
  17. $ celery multi restart Leslie -E --pidfile=/var/run/celery/%n.pid
  18. --logfile=/var/run/celery/%n%I.log
  19. $ # To stop the node, you need to specify the same pidfile.
  20. $ celery multi stop Leslie --pidfile=/var/run/celery/%n.pid
  21. $ # 3 workers, with 3 processes each
  22. $ celery multi start 3 -c 3
  23. celery worker -n celery1@myhost -c 3
  24. celery worker -n celery2@myhost -c 3
  25. celery worker -n celery3@myhost -c 3
  26. $ # start 3 named workers
  27. $ celery multi start image video data -c 3
  28. celery worker -n image@myhost -c 3
  29. celery worker -n video@myhost -c 3
  30. celery worker -n data@myhost -c 3
  31. $ # specify custom hostname
  32. $ celery multi start 2 --hostname=worker.example.com -c 3
  33. celery worker -n celery1@worker.example.com -c 3
  34. celery worker -n celery2@worker.example.com -c 3
  35. $ # specify fully qualified nodenames
  36. $ celery multi start foo@worker.example.com bar@worker.example.com -c 3
  37. $ # fully qualified nodenames but using the current hostname
  38. $ celery multi start foo@%h bar@%h
  39. $ # Advanced example starting 10 workers in the background:
  40. $ # * Three of the workers processes the images and video queue
  41. $ # * Two of the workers processes the data queue with loglevel DEBUG
  42. $ # * the rest processes the default' queue.
  43. $ celery multi start 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  44. -Q default -L:4,5 DEBUG
  45. $ # You can show the commands necessary to start the workers with
  46. $ # the 'show' command:
  47. $ celery multi show 10 -l INFO -Q:1-3 images,video -Q:4,5 data
  48. -Q default -L:4,5 DEBUG
  49. $ # Additional options are added to each celery worker' comamnd,
  50. $ # but you can also modify the options for ranges of, or specific workers
  51. $ # 3 workers: Two with 3 processes, and one with 10 processes.
  52. $ celery multi start 3 -c 3 -c:1 10
  53. celery worker -n celery1@myhost -c 10
  54. celery worker -n celery2@myhost -c 3
  55. celery worker -n celery3@myhost -c 3
  56. $ # can also specify options for named workers
  57. $ celery multi start image video data -c 3 -c:image 10
  58. celery worker -n image@myhost -c 10
  59. celery worker -n video@myhost -c 3
  60. celery worker -n data@myhost -c 3
  61. $ # ranges and lists of workers in options is also allowed:
  62. $ # (-c:1-3 can also be written as -c:1,2,3)
  63. $ celery multi start 5 -c 3 -c:1-3 10
  64. celery worker -n celery1@myhost -c 10
  65. celery worker -n celery2@myhost -c 10
  66. celery worker -n celery3@myhost -c 10
  67. celery worker -n celery4@myhost -c 3
  68. celery worker -n celery5@myhost -c 3
  69. $ # lists also works with named workers
  70. $ celery multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  71. celery worker -n foo@myhost -c 10
  72. celery worker -n bar@myhost -c 10
  73. celery worker -n baz@myhost -c 10
  74. celery worker -n xuzzy@myhost -c 3
  75. """
  76. from __future__ import absolute_import, print_function, unicode_literals
  77. import os
  78. import signal
  79. import sys
  80. from functools import wraps
  81. from kombu.utils.objects import cached_property
  82. from celery import VERSION_BANNER
  83. from celery.apps.multi import Cluster, MultiParser, NamespacedOptionParser
  84. from celery.platforms import EX_FAILURE, EX_OK, signals
  85. from celery.utils import term
  86. from celery.utils.text import pluralize
  87. __all__ = ('MultiTool',)
  88. USAGE = """\
  89. usage: {prog_name} start <node1 node2 nodeN|range> [worker options]
  90. {prog_name} stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  91. {prog_name} restart <n1 n2 nN|range> [-SIG] [worker options]
  92. {prog_name} kill <n1 n2 nN|range>
  93. {prog_name} show <n1 n2 nN|range> [worker options]
  94. {prog_name} get hostname <n1 n2 nN|range> [-qv] [worker options]
  95. {prog_name} names <n1 n2 nN|range>
  96. {prog_name} expand template <n1 n2 nN|range>
  97. {prog_name} help
  98. additional options (must appear after command name):
  99. * --nosplash: Don't display program info.
  100. * --quiet: Don't show as much output.
  101. * --verbose: Show more output.
  102. * --no-color: Don't display colors.
  103. """
  104. def main():
  105. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  106. def splash(fun):
  107. @wraps(fun)
  108. def _inner(self, *args, **kwargs):
  109. self.splash()
  110. return fun(self, *args, **kwargs)
  111. return _inner
  112. def using_cluster(fun):
  113. @wraps(fun)
  114. def _inner(self, *argv, **kwargs):
  115. return fun(self, self.cluster_from_argv(argv), **kwargs)
  116. return _inner
  117. def using_cluster_and_sig(fun):
  118. @wraps(fun)
  119. def _inner(self, *argv, **kwargs):
  120. p, cluster = self._cluster_from_argv(argv)
  121. sig = self._find_sig_argument(p)
  122. return fun(self, cluster, sig, **kwargs)
  123. return _inner
  124. class TermLogger(object):
  125. splash_text = 'celery multi v{version}'
  126. splash_context = {'version': VERSION_BANNER}
  127. #: Final exit code.
  128. retcode = 0
  129. def setup_terminal(self, stdout, stderr,
  130. nosplash=False, quiet=False, verbose=False,
  131. no_color=False, **kwargs):
  132. self.stdout = stdout or sys.stdout
  133. self.stderr = stderr or sys.stderr
  134. self.nosplash = nosplash
  135. self.quiet = quiet
  136. self.verbose = verbose
  137. self.no_color = no_color
  138. def ok(self, m, newline=True, file=None):
  139. self.say(m, newline=newline, file=file)
  140. return EX_OK
  141. def say(self, m, newline=True, file=None):
  142. print(m, file=file or self.stdout, end='\n' if newline else '')
  143. def carp(self, m, newline=True, file=None):
  144. return self.say(m, newline, file or self.stderr)
  145. def error(self, msg=None):
  146. if msg:
  147. self.carp(msg)
  148. self.usage()
  149. return EX_FAILURE
  150. def info(self, msg, newline=True):
  151. if self.verbose:
  152. self.note(msg, newline=newline)
  153. def note(self, msg, newline=True):
  154. if not self.quiet:
  155. self.say(str(msg), newline=newline)
  156. @splash
  157. def usage(self):
  158. self.say(USAGE.format(prog_name=self.prog_name))
  159. def splash(self):
  160. if not self.nosplash:
  161. self.note(self.colored.cyan(
  162. self.splash_text.format(**self.splash_context)))
  163. @cached_property
  164. def colored(self):
  165. return term.colored(enabled=not self.no_color)
  166. class MultiTool(TermLogger):
  167. """The ``celery multi`` program."""
  168. MultiParser = MultiParser
  169. OptionParser = NamespacedOptionParser
  170. reserved_options = [
  171. ('--nosplash', 'nosplash'),
  172. ('--quiet', 'quiet'),
  173. ('-q', 'quiet'),
  174. ('--verbose', 'verbose'),
  175. ('--no-color', 'no_color'),
  176. ]
  177. def __init__(self, env=None, cmd=None,
  178. fh=None, stdout=None, stderr=None, **kwargs):
  179. # fh is an old alias to stdout.
  180. self.env = env
  181. self.cmd = cmd
  182. self.setup_terminal(stdout or fh, stderr, **kwargs)
  183. self.fh = self.stdout
  184. self.prog_name = 'celery multi'
  185. self.commands = {
  186. 'start': self.start,
  187. 'show': self.show,
  188. 'stop': self.stop,
  189. 'stopwait': self.stopwait,
  190. 'stop_verify': self.stopwait, # compat alias
  191. 'restart': self.restart,
  192. 'kill': self.kill,
  193. 'names': self.names,
  194. 'expand': self.expand,
  195. 'get': self.get,
  196. 'help': self.help,
  197. }
  198. def execute_from_commandline(self, argv, cmd=None):
  199. # Reserve the --nosplash|--quiet|-q/--verbose options.
  200. argv = self._handle_reserved_options(argv)
  201. self.cmd = cmd if cmd is not None else self.cmd
  202. self.prog_name = os.path.basename(argv.pop(0))
  203. if not self.validate_arguments(argv):
  204. return self.error()
  205. return self.call_command(argv[0], argv[1:])
  206. def validate_arguments(self, argv):
  207. return argv and argv[0][0] != '-'
  208. def call_command(self, command, argv):
  209. try:
  210. return self.commands[command](*argv) or EX_OK
  211. except KeyError:
  212. return self.error('Invalid command: {0}'.format(command))
  213. def _handle_reserved_options(self, argv):
  214. argv = list(argv) # don't modify callers argv.
  215. for arg, attr in self.reserved_options:
  216. if arg in argv:
  217. setattr(self, attr, bool(argv.pop(argv.index(arg))))
  218. return argv
  219. @splash
  220. @using_cluster
  221. def start(self, cluster):
  222. self.note('> Starting nodes...')
  223. return int(any(cluster.start()))
  224. @splash
  225. @using_cluster_and_sig
  226. def stop(self, cluster, sig, **kwargs):
  227. return cluster.stop(sig=sig, **kwargs)
  228. @splash
  229. @using_cluster_and_sig
  230. def stopwait(self, cluster, sig, **kwargs):
  231. return cluster.stopwait(sig=sig, **kwargs)
  232. stop_verify = stopwait # compat
  233. @splash
  234. @using_cluster_and_sig
  235. def restart(self, cluster, sig, **kwargs):
  236. return int(any(cluster.restart(sig=sig, **kwargs)))
  237. @using_cluster
  238. def names(self, cluster):
  239. self.say('\n'.join(n.name for n in cluster))
  240. def get(self, wanted, *argv):
  241. try:
  242. node = self.cluster_from_argv(argv).find(wanted)
  243. except KeyError:
  244. return EX_FAILURE
  245. else:
  246. return self.ok(' '.join(node.argv))
  247. @using_cluster
  248. def show(self, cluster):
  249. return self.ok('\n'.join(
  250. ' '.join(node.argv_with_executable)
  251. for node in cluster
  252. ))
  253. @splash
  254. @using_cluster
  255. def kill(self, cluster):
  256. return cluster.kill()
  257. def expand(self, template, *argv):
  258. return self.ok('\n'.join(
  259. node.expander(template)
  260. for node in self.cluster_from_argv(argv)
  261. ))
  262. def help(self, *argv):
  263. self.say(__doc__)
  264. def _find_sig_argument(self, p, default=signal.SIGTERM):
  265. args = p.args[len(p.values):]
  266. for arg in reversed(args):
  267. if len(arg) == 2 and arg[0] == '-':
  268. try:
  269. return int(arg[1])
  270. except ValueError:
  271. pass
  272. if arg[0] == '-':
  273. try:
  274. return signals.signum(arg[1:])
  275. except (AttributeError, TypeError):
  276. pass
  277. return default
  278. def _nodes_from_argv(self, argv, cmd=None):
  279. cmd = cmd if cmd is not None else self.cmd
  280. p = self.OptionParser(argv)
  281. p.parse()
  282. return p, self.MultiParser(cmd=cmd).parse(p)
  283. def cluster_from_argv(self, argv, cmd=None):
  284. _, cluster = self._cluster_from_argv(argv, cmd=cmd)
  285. return cluster
  286. def _cluster_from_argv(self, argv, cmd=None):
  287. p, nodes = self._nodes_from_argv(argv, cmd=cmd)
  288. return p, self.Cluster(list(nodes), cmd=cmd)
  289. def Cluster(self, nodes, cmd=None):
  290. return Cluster(
  291. nodes,
  292. cmd=cmd,
  293. env=self.env,
  294. on_stopping_preamble=self.on_stopping_preamble,
  295. on_send_signal=self.on_send_signal,
  296. on_still_waiting_for=self.on_still_waiting_for,
  297. on_still_waiting_progress=self.on_still_waiting_progress,
  298. on_still_waiting_end=self.on_still_waiting_end,
  299. on_node_start=self.on_node_start,
  300. on_node_restart=self.on_node_restart,
  301. on_node_shutdown_ok=self.on_node_shutdown_ok,
  302. on_node_status=self.on_node_status,
  303. on_node_signal_dead=self.on_node_signal_dead,
  304. on_node_signal=self.on_node_signal,
  305. on_node_down=self.on_node_down,
  306. on_child_spawn=self.on_child_spawn,
  307. on_child_signalled=self.on_child_signalled,
  308. on_child_failure=self.on_child_failure,
  309. )
  310. def on_stopping_preamble(self, nodes):
  311. self.note(self.colored.blue('> Stopping nodes...'))
  312. def on_send_signal(self, node, sig):
  313. self.note('\t> {0.name}: {1} -> {0.pid}'.format(node, sig))
  314. def on_still_waiting_for(self, nodes):
  315. num_left = len(nodes)
  316. if num_left:
  317. self.note(self.colored.blue(
  318. '> Waiting for {0} {1} -> {2}...'.format(
  319. num_left, pluralize(num_left, 'node'),
  320. ', '.join(str(node.pid) for node in nodes)),
  321. ), newline=False)
  322. def on_still_waiting_progress(self, nodes):
  323. self.note('.', newline=False)
  324. def on_still_waiting_end(self):
  325. self.note('')
  326. def on_node_signal_dead(self, node):
  327. self.note(
  328. 'Could not signal {0.name} ({0.pid}): No such process'.format(
  329. node))
  330. def on_node_start(self, node):
  331. self.note('\t> {0.name}: '.format(node), newline=False)
  332. def on_node_restart(self, node):
  333. self.note(self.colored.blue(
  334. '> Restarting node {0.name}: '.format(node)), newline=False)
  335. def on_node_down(self, node):
  336. self.note('> {0.name}: {1.DOWN}'.format(node, self))
  337. def on_node_shutdown_ok(self, node):
  338. self.note('\n\t> {0.name}: {1.OK}'.format(node, self))
  339. def on_node_status(self, node, retval):
  340. self.note(retval and self.FAILED or self.OK)
  341. def on_node_signal(self, node, sig):
  342. self.note('Sending {sig} to node {0.name} ({0.pid})'.format(
  343. node, sig=sig))
  344. def on_child_spawn(self, node, argstr, env):
  345. self.info(' {0}'.format(argstr))
  346. def on_child_signalled(self, node, signum):
  347. self.note('* Child was terminated by signal {0}'.format(signum))
  348. def on_child_failure(self, node, retcode):
  349. self.note('* Child terminated with exit code {0}'.format(retcode))
  350. @cached_property
  351. def OK(self):
  352. return str(self.colored.green('OK'))
  353. @cached_property
  354. def FAILED(self):
  355. return str(self.colored.red('FAILED'))
  356. @cached_property
  357. def DOWN(self):
  358. return str(self.colored.magenta('DOWN'))
  359. if __name__ == '__main__': # pragma: no cover
  360. main()