celeryd_multi.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. # -*- coding: utf-8 -*-
  2. """
  3. Examples
  4. ========
  5. ::
  6. # Single worker with explicit name and events enabled.
  7. $ celeryd-multi start Leslie -E
  8. # Pidfiles and logfiles are stored in the current directory
  9. # by default. Use --pidfile and --logfile argument to change
  10. # this. The abbreviation %n will be expanded to the current
  11. # node name.
  12. $ celeryd-multi start Leslie -E --pidfile=/var/run/celery/%n.pid
  13. --logfile=/var/log/celery/%n.log
  14. # You need to add the same arguments when you restart,
  15. # as these are not persisted anywhere.
  16. $ celeryd-multi restart Leslie -E --pidfile=/var/run/celery/%n.pid
  17. --logfile=/var/run/celery/%n.log
  18. # To stop the node, you need to specify the same pidfile.
  19. $ celeryd-multi stop Leslie --pidfile=/var/run/celery/%n.pid
  20. # 3 workers, with 3 processes each
  21. $ celeryd-multi start 3 -c 3
  22. celeryd -n celeryd1.myhost -c 3
  23. celeryd -n celeryd2.myhost -c 3
  24. celeryd- n celeryd3.myhost -c 3
  25. # start 3 named workers
  26. $ celeryd-multi start image video data -c 3
  27. celeryd -n image.myhost -c 3
  28. celeryd -n video.myhost -c 3
  29. celeryd -n data.myhost -c 3
  30. # specify custom hostname
  31. $ celeryd-multi start 2 -n worker.example.com -c 3
  32. celeryd -n celeryd1.worker.example.com -c 3
  33. celeryd -n celeryd2.worker.example.com -c 3
  34. # Advanced example starting 10 workers in the background:
  35. # * Three of the workers processes the images and video queue
  36. # * Two of the workers processes the data queue with loglevel DEBUG
  37. # * the rest processes the default' queue.
  38. $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  39. -Q default -L:4,5 DEBUG
  40. # You can show the commands necessary to start the workers with
  41. # the "show" command:
  42. $ celeryd-multi show 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  43. -Q default -L:4,5 DEBUG
  44. # Additional options are added to each celeryd',
  45. # but you can also modify the options for ranges of, or specific workers
  46. # 3 workers: Two with 3 processes, and one with 10 processes.
  47. $ celeryd-multi start 3 -c 3 -c:1 10
  48. celeryd -n celeryd1.myhost -c 10
  49. celeryd -n celeryd2.myhost -c 3
  50. celeryd -n celeryd3.myhost -c 3
  51. # can also specify options for named workers
  52. $ celeryd-multi start image video data -c 3 -c:image 10
  53. celeryd -n image.myhost -c 10
  54. celeryd -n video.myhost -c 3
  55. celeryd -n data.myhost -c 3
  56. # ranges and lists of workers in options is also allowed:
  57. # (-c:1-3 can also be written as -c:1,2,3)
  58. $ celeryd-multi start 5 -c 3 -c:1-3 10
  59. celeryd -n celeryd1.myhost -c 10
  60. celeryd -n celeryd2.myhost -c 10
  61. celeryd -n celeryd3.myhost -c 10
  62. celeryd -n celeryd4.myhost -c 3
  63. celeryd -n celeryd5.myhost -c 3
  64. # lists also works with named workers
  65. $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  66. celeryd -n foo.myhost -c 10
  67. celeryd -n bar.myhost -c 10
  68. celeryd -n baz.myhost -c 10
  69. celeryd -n xuzzy.myhost -c 3
  70. """
  71. from __future__ import absolute_import
  72. import errno
  73. import os
  74. import signal
  75. import socket
  76. import sys
  77. from collections import defaultdict
  78. from subprocess import Popen
  79. from time import sleep
  80. from kombu.utils.encoding import from_utf8
  81. from celery import __version__
  82. from celery.platforms import PIDFile, shellsplit
  83. from celery.utils import term
  84. from celery.utils.text import pluralize
  85. SIGNAMES = set(sig for sig in dir(signal)
  86. if sig.startswith("SIG") and "_" not in sig)
  87. SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)
  88. USAGE = """\
  89. usage: %(prog_name)s start <node1 node2 nodeN|range> [celeryd options]
  90. %(prog_name)s stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  91. %(prog_name)s restart <n1 n2 nN|range> [-SIG] [celeryd options]
  92. %(prog_name)s kill <n1 n2 nN|range>
  93. %(prog_name)s show <n1 n2 nN|range> [celeryd options]
  94. %(prog_name)s get hostname <n1 n2 nN|range> [-qv] [celeryd options]
  95. %(prog_name)s names <n1 n2 nN|range>
  96. %(prog_name)s expand template <n1 n2 nN|range>
  97. %(prog_name)s help
  98. additional options (must appear after command name):
  99. * --nosplash: Don't display program info.
  100. * --quiet: Don't show as much output.
  101. * --verbose: Show more output.
  102. * --no-color: Don't display colors.
  103. """
  104. def main():
  105. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  106. class MultiTool(object):
  107. retcode = 0 # Final exit code.
  108. def __init__(self, env=None, fh=None):
  109. self.fh = fh or sys.stderr
  110. self.env = env
  111. self.commands = {"start": self.start,
  112. "show": self.show,
  113. "stop": self.stop,
  114. "stop_verify": self.stop_verify,
  115. "restart": self.restart,
  116. "kill": self.kill,
  117. "names": self.names,
  118. "expand": self.expand,
  119. "get": self.get,
  120. "help": self.help}
  121. def execute_from_commandline(self, argv, cmd="celeryd"):
  122. argv = list(argv) # don't modify callers argv.
  123. # Reserve the --nosplash|--quiet|-q/--verbose options.
  124. self.nosplash = False
  125. self.quiet = False
  126. self.verbose = False
  127. self.no_color = False
  128. if "--nosplash" in argv:
  129. self.nosplash = argv.pop(argv.index("--nosplash"))
  130. if "--quiet" in argv:
  131. self.quiet = argv.pop(argv.index("--quiet"))
  132. if "-q" in argv:
  133. self.quiet = argv.pop(argv.index("-q"))
  134. if "--verbose" in argv:
  135. self.verbose = argv.pop(argv.index("--verbose"))
  136. if "--no-color" in argv:
  137. self.no_color = argv.pop(argv.index("--no-color"))
  138. self.colored = term.colored(enabled=not self.no_color)
  139. self.OK = str(self.colored.green("OK"))
  140. self.FAILED = str(self.colored.red("FAILED"))
  141. self.DOWN = str(self.colored.magenta("DOWN"))
  142. self.prog_name = os.path.basename(argv.pop(0))
  143. if len(argv) == 0 or argv[0][0] == "-":
  144. return self.error()
  145. try:
  146. self.commands[argv[0]](argv[1:], cmd)
  147. except KeyError:
  148. self.error("Invalid command: %s" % argv[0])
  149. return self.retcode
  150. def say(self, m, newline=True):
  151. self.fh.write("%s\n" % m if m else m)
  152. def names(self, argv, cmd):
  153. p = NamespacedOptionParser(argv)
  154. self.say("\n".join(hostname
  155. for hostname, _, _ in multi_args(p, cmd)))
  156. def get(self, argv, cmd):
  157. wanted = argv[0]
  158. p = NamespacedOptionParser(argv[1:])
  159. for name, worker, _ in multi_args(p, cmd):
  160. if name == wanted:
  161. self.say(" ".join(worker))
  162. return
  163. def show(self, argv, cmd):
  164. p = NamespacedOptionParser(argv)
  165. self.note("> Starting nodes...")
  166. self.say("\n".join(" ".join(worker)
  167. for _, worker, _ in multi_args(p, cmd)))
  168. def start(self, argv, cmd):
  169. self.splash()
  170. p = NamespacedOptionParser(argv)
  171. self.with_detacher_default_options(p)
  172. retcodes = []
  173. self.note("> Starting nodes...")
  174. for nodename, argv, _ in multi_args(p, cmd):
  175. self.note("\t> %s: " % (nodename, ), newline=False)
  176. retcode = self.waitexec(argv)
  177. self.note(retcode and self.FAILED or self.OK)
  178. retcodes.append(retcode)
  179. self.retcode = int(any(retcodes))
  180. def with_detacher_default_options(self, p):
  181. p.options.setdefault("--pidfile", "celeryd@%n.pid")
  182. p.options.setdefault("--logfile", "celeryd@%n.log")
  183. p.options.setdefault("--cmd", "-m celery.bin.celeryd_detach")
  184. def signal_node(self, nodename, pid, sig):
  185. try:
  186. os.kill(pid, sig)
  187. except OSError, exc:
  188. if exc.errno != errno.ESRCH:
  189. raise
  190. self.note("Could not signal %s (%s): No such process" % (
  191. nodename, pid))
  192. return False
  193. return True
  194. def node_alive(self, pid):
  195. try:
  196. os.kill(pid, 0)
  197. except OSError, exc:
  198. if exc.errno == errno.ESRCH:
  199. return False
  200. raise
  201. return True
  202. def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
  203. callback=None):
  204. if not nodes:
  205. return
  206. P = set(nodes)
  207. def on_down(node):
  208. P.discard(node)
  209. if callback:
  210. callback(*node)
  211. self.note(self.colored.blue("> Stopping nodes..."))
  212. for node in list(P):
  213. if node in P:
  214. nodename, _, pid = node
  215. self.note("\t> %s: %s -> %s" % (nodename,
  216. SIGMAP[sig][3:],
  217. pid))
  218. if not self.signal_node(nodename, pid, sig):
  219. on_down(node)
  220. def note_waiting():
  221. left = len(P)
  222. if left:
  223. self.note(self.colored.blue("> Waiting for %s %s..." % (
  224. left, pluralize(left, "node"))), newline=False)
  225. if retry:
  226. note_waiting()
  227. its = 0
  228. while P:
  229. for node in P:
  230. its += 1
  231. self.note(".", newline=False)
  232. nodename, _, pid = node
  233. if not self.node_alive(pid):
  234. self.note("\n\t> %s: %s" % (nodename, self.OK))
  235. on_down(node)
  236. note_waiting()
  237. break
  238. if P and not its % len(P):
  239. sleep(float(retry))
  240. self.note("")
  241. def getpids(self, p, cmd, callback=None):
  242. pidfile_template = p.options.setdefault("--pidfile", "celeryd@%n.pid")
  243. nodes = []
  244. for nodename, argv, expander in multi_args(p, cmd):
  245. pid = None
  246. pidfile = expander(pidfile_template)
  247. try:
  248. pid = PIDFile(pidfile).read_pid()
  249. except ValueError:
  250. pass
  251. if pid:
  252. nodes.append((nodename, tuple(argv), pid))
  253. else:
  254. self.note("> %s: %s" % (nodename, self.DOWN))
  255. if callback:
  256. callback(nodename, argv, pid)
  257. return nodes
  258. def kill(self, argv, cmd):
  259. self.splash()
  260. p = NamespacedOptionParser(argv)
  261. for nodename, _, pid in self.getpids(p, cmd):
  262. self.note("Killing node %s (%s)" % (nodename, pid))
  263. self.signal_node(nodename, pid, signal.SIGKILL)
  264. def stop(self, argv, cmd):
  265. self.splash()
  266. p = NamespacedOptionParser(argv)
  267. return self._stop_nodes(p, cmd)
  268. def _stop_nodes(self, p, cmd, retry=None, callback=None):
  269. restargs = p.args[len(p.values):]
  270. self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
  271. sig=findsig(restargs),
  272. retry=retry,
  273. callback=callback)
  274. def restart(self, argv, cmd):
  275. self.splash()
  276. p = NamespacedOptionParser(argv)
  277. self.with_detacher_default_options(p)
  278. retvals = []
  279. def on_node_shutdown(nodename, argv, pid):
  280. self.note(self.colored.blue(
  281. "> Restarting node %s: " % nodename), newline=False)
  282. retval = self.waitexec(argv)
  283. self.note(retval and self.FAILED or self.OK)
  284. retvals.append(retval)
  285. self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
  286. self.retval = int(any(retvals))
  287. def stop_verify(self, argv, cmd):
  288. self.splash()
  289. p = NamespacedOptionParser(argv)
  290. self.with_detacher_default_options(p)
  291. return self._stop_nodes(p, cmd, retry=2)
  292. def expand(self, argv, cmd=None):
  293. template = argv[0]
  294. p = NamespacedOptionParser(argv[1:])
  295. for _, _, expander in multi_args(p, cmd):
  296. self.say(expander(template))
  297. def help(self, argv, cmd=None):
  298. self.say(__doc__)
  299. def usage(self):
  300. self.splash()
  301. self.say(USAGE % {"prog_name": self.prog_name})
  302. def splash(self):
  303. if not self.nosplash:
  304. c = self.colored
  305. self.note(c.cyan("celeryd-multi v%s" % __version__))
  306. def waitexec(self, argv, path=sys.executable):
  307. args = " ".join([path] + list(argv))
  308. argstr = shellsplit(from_utf8(args))
  309. pipe = Popen(argstr, env=self.env)
  310. self.info(" %s" % " ".join(argstr))
  311. retcode = pipe.wait()
  312. if retcode < 0:
  313. self.note("* Child was terminated by signal %s" % (-retcode, ))
  314. return -retcode
  315. elif retcode > 0:
  316. self.note("* Child terminated with failure code %s" % (retcode, ))
  317. return retcode
  318. def error(self, msg=None):
  319. if msg:
  320. self.say(msg)
  321. self.usage()
  322. self.retcode = 1
  323. return 1
  324. def info(self, msg, newline=True):
  325. if self.verbose:
  326. self.note(msg, newline=newline)
  327. def note(self, msg, newline=True):
  328. if not self.quiet:
  329. self.say(str(msg), newline=newline)
  330. def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
  331. names = p.values
  332. options = dict(p.options)
  333. passthrough = p.passthrough
  334. ranges = len(names) == 1
  335. if ranges:
  336. try:
  337. noderange = int(names[0])
  338. except ValueError:
  339. pass
  340. else:
  341. names = map(str, range(1, noderange + 1))
  342. prefix = "celery"
  343. cmd = options.pop("--cmd", cmd)
  344. append = options.pop("--append", append)
  345. hostname = options.pop("--hostname",
  346. options.pop("-n", socket.gethostname()))
  347. prefix = options.pop("--prefix", prefix) or ""
  348. suffix = options.pop("--suffix", suffix) or "." + hostname
  349. if suffix in ('""', "''"):
  350. suffix = ""
  351. for ns_name, ns_opts in p.namespaces.items():
  352. if "," in ns_name or (ranges and "-" in ns_name):
  353. for subns in parse_ns_range(ns_name, ranges):
  354. p.namespaces[subns].update(ns_opts)
  355. p.namespaces.pop(ns_name)
  356. for name in names:
  357. this_name = options["-n"] = prefix + name + suffix
  358. expand = abbreviations({"%h": this_name,
  359. "%n": name})
  360. argv = ([expand(cmd)] +
  361. [format_opt(opt, expand(value))
  362. for opt, value in p.optmerge(name, options).items()] +
  363. [passthrough])
  364. if append:
  365. argv.append(expand(append))
  366. yield this_name, argv, expand
  367. class NamespacedOptionParser(object):
  368. def __init__(self, args):
  369. self.args = args
  370. self.options = {}
  371. self.values = []
  372. self.passthrough = ""
  373. self.namespaces = defaultdict(lambda: {})
  374. self.parse()
  375. def parse(self):
  376. rargs = list(self.args)
  377. pos = 0
  378. while pos < len(rargs):
  379. arg = rargs[pos]
  380. if arg == "--":
  381. self.passthrough = " ".join(rargs[pos:])
  382. break
  383. elif arg[0] == "-":
  384. if arg[1] == "-":
  385. self.process_long_opt(arg[2:])
  386. else:
  387. value = None
  388. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  389. value = rargs[pos + 1]
  390. pos += 1
  391. self.process_short_opt(arg[1:], value)
  392. else:
  393. self.values.append(arg)
  394. pos += 1
  395. def process_long_opt(self, arg, value=None):
  396. if "=" in arg:
  397. arg, value = arg.split("=", 1)
  398. self.add_option(arg, value, short=False)
  399. def process_short_opt(self, arg, value=None):
  400. self.add_option(arg, value, short=True)
  401. def optmerge(self, ns, defaults=None):
  402. if defaults is None:
  403. defaults = self.options
  404. return dict(defaults, **self.namespaces[ns])
  405. def add_option(self, name, value, short=False, ns=None):
  406. prefix = short and "-" or "--"
  407. dest = self.options
  408. if ":" in name:
  409. name, ns = name.split(":")
  410. dest = self.namespaces[ns]
  411. dest[prefix + name] = value
  412. def quote(v):
  413. return "\\'".join("'" + p + "'" for p in v.split("'"))
  414. def format_opt(opt, value):
  415. if not value:
  416. return opt
  417. if opt[0:2] == "--":
  418. return "%s=%s" % (opt, value)
  419. return "%s %s" % (opt, value)
  420. def parse_ns_range(ns, ranges=False):
  421. ret = []
  422. for space in "," in ns and ns.split(",") or [ns]:
  423. if ranges and "-" in space:
  424. start, stop = space.split("-")
  425. x = map(str, range(int(start), int(stop) + 1))
  426. ret.extend(x)
  427. else:
  428. ret.append(space)
  429. return ret
  430. def abbreviations(map):
  431. def expand(S):
  432. ret = S
  433. if S is not None:
  434. for short, long in map.items():
  435. ret = ret.replace(short, long)
  436. return ret
  437. return expand
  438. def findsig(args, default=signal.SIGTERM):
  439. for arg in reversed(args):
  440. if len(arg) == 2 and arg[0] == "-" and arg[1].isdigit():
  441. try:
  442. return int(arg[1])
  443. except ValueError:
  444. pass
  445. if arg[0] == "-":
  446. maybe_sig = "SIG" + arg[1:]
  447. if maybe_sig in SIGNAMES:
  448. return getattr(signal, maybe_sig)
  449. return default
  450. if __name__ == "__main__": # pragma: no cover
  451. main()