celeryd_multi.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. """
  2. Some examples:
  3. # Advanced example starting 10 workers in the background:
  4. # * Three of the workers processes the images and video queue
  5. # * Two of the workers processes the data queue with loglevel DEBUG
  6. # * the rest processes the default' queue.
  7. $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  8. -Q default -L:4,5 DEBUG
  9. # You can show the commands necessary to start the workers with
  10. # the "show" command:
  11. $ celeryd-multi show 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  12. -Q default -L:4,5 DEBUG
  13. # 3 workers, with 3 processes each
  14. $ celeryd-multi start 3 -c 3
  15. celeryd -n celeryd1.myhost -c 3
  16. celeryd -n celeryd2.myhost -c 3
  17. celeryd- n celeryd3.myhost -c 3
  18. # start 3 named workers
  19. $ celeryd-multi start image video data -c 3
  20. celeryd -n image.myhost -c 3
  21. celeryd -n video.myhost -c 3
  22. celeryd -n data.myhost -c 3
  23. # specify custom hostname
  24. $ celeryd-multi start 2 -n worker.example.com -c 3
  25. celeryd -n celeryd1.worker.example.com -c 3
  26. celeryd -n celeryd2.worker.example.com -c 3
  27. # Additionl options are added to each celeryd',
  28. # but you can also modify the options for ranges of or single workers
  29. # 3 workers: Two with 3 processes, and one with 10 processes.
  30. $ celeryd-multi start 3 -c 3 -c:1 10
  31. celeryd -n celeryd1.myhost -c 10
  32. celeryd -n celeryd2.myhost -c 3
  33. celeryd -n celeryd3.myhost -c 3
  34. # can also specify options for named workers
  35. $ celeryd-multi start image video data -c 3 -c:image 10
  36. celeryd -n image.myhost -c 10
  37. celeryd -n video.myhost -c 3
  38. celeryd -n data.myhost -c 3
  39. # ranges and lists of workers in options is also allowed:
  40. # (-c:1-3 can also be written as -c:1,2,3)
  41. $ celeryd-multi start 5 -c 3 -c:1-3 10
  42. celeryd -n celeryd1.myhost -c 10
  43. celeryd -n celeryd2.myhost -c 10
  44. celeryd -n celeryd3.myhost -c 10
  45. celeryd -n celeryd4.myhost -c 3
  46. celeryd -n celeryd5.myhost -c 3
  47. # lists also works with named workers
  48. $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  49. celeryd -n foo.myhost -c 10
  50. celeryd -n bar.myhost -c 10
  51. celeryd -n baz.myhost -c 10
  52. celeryd -n xuzzy.myhost -c 3
  53. """
  54. import errno
  55. import os
  56. import shlex
  57. import signal
  58. import socket
  59. import sys
  60. from subprocess import Popen
  61. from time import sleep
  62. from celery import __version__
  63. from celery.utils import term
  64. from celery.utils.compat import defaultdict
  65. SIGNAMES = set(sig for sig in dir(signal)
  66. if sig.startswith("SIG") and "_" not in sig)
  67. SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)
  68. USAGE = """\
  69. usage: %(prog_name)s start <node1 node2 nodeN|range> [celeryd options]
  70. %(prog_name)s stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  71. %(prog_name)s restart <n1 n2 nN|range> [-SIG] [celeryd options]
  72. %(prog_name)s kill <n1 n2 nN|range>
  73. %(prog_name)s show <n1 n2 nN|range> [celeryd options]
  74. %(prog_name)s get hostname <n1 n2 nN|range> [-qv] [celeryd options]
  75. %(prog_name)s names <n1 n2 nN|range>
  76. %(prog_name)s expand template <n1 n2 nN|range>
  77. %(prog_name)s help
  78. additional options (must appear after command name):
  79. * --quiet: Don't show as much output.
  80. * --verbose: Show more output.
  81. * --no-color: Don't display colors.
  82. """
  83. def main():
  84. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  85. class MultiTool(object):
  86. retcode = 0 # Final exit code.
  87. def __init__(self):
  88. self.commands = {"start": self.start,
  89. "show": self.show,
  90. "stop": self.stop,
  91. "restart": self.restart,
  92. "kill": self.kill,
  93. "names": self.names,
  94. "expand": self.expand,
  95. "get": self.get,
  96. "help": self.help}
  97. def execute_from_commandline(self, argv, cmd="celeryd"):
  98. argv = list(argv) # don't modify callers argv.
  99. # Reserve the --quiet|-q/--verbose options.
  100. self.quiet = False
  101. self.verbose = False
  102. self.no_color = False
  103. if "--quiet" in argv:
  104. self.quiet = argv.pop(argv.index("--quiet"))
  105. if "-q" in argv:
  106. self.quiet = argv.pop(argv.index("-q"))
  107. if "--verbose" in argv:
  108. self.verbose = argv.pop(argv.index("--verbose"))
  109. if "--no-color" in argv:
  110. self.no_color = argv.pop(argv.index("--no-color"))
  111. self.colored = term.colored(enabled=not self.no_color)
  112. self.OK = str(self.colored.green("OK"))
  113. self.FAILED = str(self.colored.red("FAILED"))
  114. self.DOWN = str(self.colored.magenta("DOWN"))
  115. self.prog_name = os.path.basename(argv.pop(0))
  116. if len(argv) == 0 or argv[0][0] == "-":
  117. return self.error()
  118. try:
  119. self.commands[argv[0]](argv[1:], cmd)
  120. except KeyError:
  121. self.error("Invalid command: %s" % argv[0])
  122. return self.retcode
  123. def names(self, argv, cmd):
  124. p = NamespacedOptionParser(argv)
  125. print("\n".join(hostname
  126. for hostname, _, _ in multi_args(p, cmd)))
  127. def get(self, argv, cmd):
  128. wanted = argv[0]
  129. p = NamespacedOptionParser(argv[1:])
  130. for name, worker, _ in multi_args(p, cmd):
  131. print("NAME: %s WANTED: %s" % (name, wanted))
  132. if name == wanted:
  133. print(" ".join(worker))
  134. return
  135. def show(self, argv, cmd):
  136. p = NamespacedOptionParser(argv)
  137. self.note("> Starting nodes...")
  138. print("\n".join(" ".join(worker)
  139. for _, worker, _ in multi_args(p, cmd)))
  140. def start(self, argv, cmd):
  141. self.splash()
  142. p = NamespacedOptionParser(argv)
  143. self.with_detacher_default_options(p)
  144. retcodes = []
  145. self.note("> Starting nodes...")
  146. for nodename, argv, _ in multi_args(p, cmd):
  147. self.note("\t> %s: " % (nodename, ), newline=False)
  148. retcode = self.waitexec(argv)
  149. self.note(retcode and self.FAILED or self.OK)
  150. retcodes.append(retcode)
  151. self.retcode = int(any(retcodes))
  152. def with_detacher_default_options(self, p):
  153. p.options.setdefault("--pidfile", "celeryd@%n.pid")
  154. p.options.setdefault("--logfile", "celeryd@%n.log")
  155. p.options.setdefault("--cmd", "-m celery.bin.celeryd_detach")
  156. def signal_node(self, nodename, pid, sig):
  157. try:
  158. os.kill(pid, sig)
  159. except OSError, exc:
  160. if exc.errno != errno.ESRCH:
  161. raise
  162. self.note("Could not signal %s (%s): No such process" % (
  163. nodename, pid))
  164. return False
  165. return True
  166. def node_alive(self, pid):
  167. try:
  168. os.kill(pid, 0)
  169. except OSError, exc:
  170. if exc.errno == errno.ESRCH:
  171. return False
  172. raise
  173. return True
  174. def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
  175. callback=None):
  176. if not nodes:
  177. return
  178. P = set(nodes)
  179. def on_down(node):
  180. P.discard(node)
  181. if callback:
  182. callback(*node)
  183. self.note(self.colored.blue("> Stopping nodes..."))
  184. for node in P:
  185. nodename, _, pid = node
  186. self.note("\t> %s: %s -> %s" % (nodename, SIGMAP[sig][3:], pid))
  187. if not self.signal_node(nodename, pid, sig):
  188. on_down(node)
  189. def note_waiting():
  190. left = len(P)
  191. if left:
  192. self.note(self.colored.blue("> Waiting for %s %s..." % (
  193. left, left > 1 and "nodes" or "node")), newline=False)
  194. if retry:
  195. note_waiting()
  196. its = 0
  197. while P:
  198. for node in P:
  199. its += 1
  200. self.note(".", newline=False)
  201. nodename, _, pid = node
  202. if not self.node_alive(pid):
  203. self.note("\n\t> %s: %s" % (nodename, self.OK))
  204. on_down(node)
  205. note_waiting()
  206. break
  207. if P and not its % len(P):
  208. sleep(float(retry))
  209. self.note("")
  210. def getpids(self, p, cmd, callback=None):
  211. from celery import platforms
  212. pidfile_template = p.options.setdefault("--pidfile", "celeryd@%n.pid")
  213. nodes = []
  214. for nodename, argv, expander in multi_args(p, cmd):
  215. pidfile = expander(pidfile_template)
  216. try:
  217. pid = platforms.PIDFile(pidfile).read_pid()
  218. except ValueError:
  219. pass
  220. if pid:
  221. nodes.append((nodename, tuple(argv), pid))
  222. else:
  223. self.note("> %s: %s" % (nodename, self.DOWN))
  224. if callback:
  225. callback(nodename, argv, pid)
  226. return nodes
  227. def kill(self, argv, cmd):
  228. self.splash()
  229. p = NamespacedOptionParser(argv)
  230. for nodename, _, pid in self.getpids(p, cmd):
  231. self.note("Killing node %s (%s)" % (nodename, pid))
  232. self.signal_node(nodename, pid, signal.SIGKILL)
  233. def stop(self, argv, cmd):
  234. self.splash()
  235. p = NamespacedOptionParser(argv)
  236. return self._stop_nodes(p, cmd)
  237. def _stop_nodes(self, p, cmd, retry=None, callback=None):
  238. restargs = p.args[len(p.values):]
  239. self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
  240. sig=findsig(restargs),
  241. retry=retry,
  242. callback=callback)
  243. def restart(self, argv, cmd):
  244. self.splash()
  245. p = NamespacedOptionParser(argv)
  246. self.with_detacher_default_options(p)
  247. retvals = []
  248. def on_node_shutdown(nodename, argv, pid):
  249. self.note(self.colored.blue(
  250. "> Restarting node %s: " % nodename), newline=False)
  251. retval = self.waitexec(argv)
  252. self.note(retval and self.FAILED or self.OK)
  253. retvals.append(retval)
  254. self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
  255. self.retval = int(any(retvals))
  256. def expand(self, argv, cmd=None):
  257. template = argv[0]
  258. p = NamespacedOptionParser(argv[1:])
  259. for _, _, expander in multi_args(p, cmd):
  260. print(expander(template))
  261. def help(self, argv, cmd=None):
  262. say(__doc__)
  263. def usage(self):
  264. self.splash()
  265. say(USAGE % {"prog_name": self.prog_name})
  266. def splash(self):
  267. c = self.colored
  268. self.note(c.cyan("celeryd-multi v%s" % __version__))
  269. def waitexec(self, argv, path=sys.executable):
  270. argstr = shlex.split(" ".join([path] + list(argv)))
  271. pipe = Popen(argstr)
  272. self.info(" %s" % " ".join(argstr))
  273. retcode = pipe.wait()
  274. if retcode < 0:
  275. self.note("* Child was terminated by signal %s" % (-retcode, ))
  276. return -retcode
  277. elif retcode > 0:
  278. self.note("* Child terminated with failure code %s" % (retcode, ))
  279. return retcode
  280. def error(self, msg=None):
  281. if msg:
  282. say(msg)
  283. self.usage()
  284. self.retcode = 1
  285. return 1
  286. def info(self, msg, newline=True):
  287. if self.verbose:
  288. self.note(msg, newline=newline)
  289. def note(self, msg, newline=True):
  290. if not self.quiet:
  291. say(str(msg), newline=newline)
  292. def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
  293. names = p.values
  294. options = dict(p.options)
  295. ranges = len(names) == 1
  296. if ranges:
  297. try:
  298. noderange = int(names[0])
  299. except ValueError:
  300. pass
  301. else:
  302. names = map(str, range(1, int(names[0]) + 1))
  303. prefix = "celery"
  304. cmd = options.pop("--cmd", cmd)
  305. append = options.pop("--append", append)
  306. hostname = options.pop("--hostname",
  307. options.pop("-n", socket.gethostname()))
  308. prefix = options.pop("--prefix", prefix) or ""
  309. suffix = options.pop("--suffix", suffix) or "." + hostname
  310. for ns_name, ns_opts in p.namespaces.items():
  311. if "," in ns_name or (ranges and "-" in ns_name):
  312. for subns in parse_ns_range(ns_name, ranges):
  313. p.namespaces[subns].update(ns_opts)
  314. p.namespaces.pop(ns_name)
  315. for name in names:
  316. this_name = options["-n"] = prefix + name + suffix
  317. expand = abbreviations({"%h": this_name,
  318. "%n": name})
  319. argv = ([expand(cmd)] +
  320. [format_opt(opt, expand(value))
  321. for opt, value in p.optmerge(name, options).items()])
  322. if append:
  323. argv.append(expand(append))
  324. yield this_name, argv, expand
  325. class NamespacedOptionParser(object):
  326. def __init__(self, args):
  327. self.args = args
  328. self.options = {}
  329. self.values = []
  330. self.namespaces = defaultdict(lambda: {})
  331. self.parse()
  332. def parse(self):
  333. rargs = list(self.args)
  334. pos = 0
  335. while pos < len(rargs):
  336. arg = rargs[pos]
  337. if arg[0] == "-":
  338. if arg[1] == "-":
  339. self.process_long_opt(arg[2:])
  340. else:
  341. value = None
  342. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  343. value = rargs[pos + 1]
  344. pos += 1
  345. self.process_short_opt(arg[1:], value)
  346. else:
  347. self.values.append(arg)
  348. pos += 1
  349. def process_long_opt(self, arg, value=None):
  350. if "=" in arg:
  351. arg, value = arg.split("=", 1)
  352. self.add_option(arg, value, short=False)
  353. def process_short_opt(self, arg, value=None):
  354. self.add_option(arg, value, short=True)
  355. def optmerge(self, ns, defaults=None):
  356. if defaults is None:
  357. defaults = self.options
  358. return dict(defaults, **self.namespaces[ns])
  359. def add_option(self, name, value, short=False, ns=None):
  360. prefix = short and "-" or "--"
  361. dest = self.options
  362. if ":" in name:
  363. name, ns = name.split(":")
  364. dest = self.namespaces[ns]
  365. dest[prefix + name] = value
  366. def quote(v):
  367. return "\\'".join("'" + p + "'" for p in v.split("'"))
  368. def format_opt(opt, value):
  369. if not value:
  370. return opt
  371. if opt[0:2] == "--":
  372. return "%s=%s" % (opt, value)
  373. return "%s %s" % (opt, value)
  374. def parse_ns_range(ns, ranges=False):
  375. ret = []
  376. for space in "," in ns and ns.split(",") or [ns]:
  377. if ranges and "-" in space:
  378. start, stop = space.split("-")
  379. x = map(str, range(int(start), int(stop) + 1))
  380. ret.extend(x)
  381. else:
  382. ret.append(space)
  383. return ret
  384. def abbreviations(map):
  385. def expand(S):
  386. ret = S
  387. if S is not None:
  388. for short, long in map.items():
  389. ret = ret.replace(short, long)
  390. return ret
  391. return expand
  392. def say(m, newline=True):
  393. sys.stderr.write(newline and "%s\n" % (m, ) or m)
  394. def findsig(args, default=signal.SIGTERM):
  395. for arg in reversed(args):
  396. if len(arg) == 2 and arg[0] == "-" and arg[1].isdigit():
  397. try:
  398. return int(arg[1])
  399. except ValueError:
  400. pass
  401. if arg[0] == "-":
  402. maybe_sig = "SIG" + arg[1:]
  403. if maybe_sig in SIGNAMES:
  404. return getattr(signal, maybe_sig)
  405. return default
  406. if __name__ == "__main__": # pragma: no cover
  407. main()