celeryd_multi.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519
  1. """
  2. Examples
  3. ========
  4. ::
  5. # Single worker with explicit name and events enabled.
  6. $ celeryd-multi start Leslie -E
  7. # Pidfiles and logfiles are stored in the current directory
  8. # by default. Use --pidfile and --logfile argument to change
  9. # this. The abbreviation %n will be expanded to the current
  10. # node name.
  11. $ celeryd-multi start Leslie -E --pidfile=/var/run/celery/%n.pid
  12. --logfile=/var/log/celery/%n.log
  13. # You need to add the same arguments when you restart,
  14. # as these are not persisted anywhere.
  15. $ celeryd-multi restart Leslie -E --pidfile=/var/run/celery/%n.pid
  16. --logfile=/var/run/celery/%n.log
  17. # To stop the node, you need to specify the same pidfile.
  18. $ celeryd-multi stop Leslie --pidfile=/var/run/celery/%n.pid
  19. # 3 workers, with 3 processes each
  20. $ celeryd-multi start 3 -c 3
  21. celeryd -n celeryd1.myhost -c 3
  22. celeryd -n celeryd2.myhost -c 3
  23. celeryd- n celeryd3.myhost -c 3
  24. # start 3 named workers
  25. $ celeryd-multi start image video data -c 3
  26. celeryd -n image.myhost -c 3
  27. celeryd -n video.myhost -c 3
  28. celeryd -n data.myhost -c 3
  29. # specify custom hostname
  30. $ celeryd-multi start 2 -n worker.example.com -c 3
  31. celeryd -n celeryd1.worker.example.com -c 3
  32. celeryd -n celeryd2.worker.example.com -c 3
  33. # Advanced example starting 10 workers in the background:
  34. # * Three of the workers processes the images and video queue
  35. # * Two of the workers processes the data queue with loglevel DEBUG
  36. # * the rest processes the default' queue.
  37. $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  38. -Q default -L:4,5 DEBUG
  39. # You can show the commands necessary to start the workers with
  40. # the "show" command:
  41. $ celeryd-multi show 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  42. -Q default -L:4,5 DEBUG
  43. # Additional options are added to each celeryd',
  44. # but you can also modify the options for ranges of, or specific workers
  45. # 3 workers: Two with 3 processes, and one with 10 processes.
  46. $ celeryd-multi start 3 -c 3 -c:1 10
  47. celeryd -n celeryd1.myhost -c 10
  48. celeryd -n celeryd2.myhost -c 3
  49. celeryd -n celeryd3.myhost -c 3
  50. # can also specify options for named workers
  51. $ celeryd-multi start image video data -c 3 -c:image 10
  52. celeryd -n image.myhost -c 10
  53. celeryd -n video.myhost -c 3
  54. celeryd -n data.myhost -c 3
  55. # ranges and lists of workers in options is also allowed:
  56. # (-c:1-3 can also be written as -c:1,2,3)
  57. $ celeryd-multi start 5 -c 3 -c:1-3 10
  58. celeryd -n celeryd1.myhost -c 10
  59. celeryd -n celeryd2.myhost -c 10
  60. celeryd -n celeryd3.myhost -c 10
  61. celeryd -n celeryd4.myhost -c 3
  62. celeryd -n celeryd5.myhost -c 3
  63. # lists also works with named workers
  64. $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  65. celeryd -n foo.myhost -c 10
  66. celeryd -n bar.myhost -c 10
  67. celeryd -n baz.myhost -c 10
  68. celeryd -n xuzzy.myhost -c 3
  69. """
  70. import errno
  71. import os
  72. import shlex
  73. import signal
  74. import socket
  75. import sys
  76. from subprocess import Popen
  77. from time import sleep
  78. from celery import __version__
  79. from celery.utils import term
  80. from celery.utils.compat import any, defaultdict
  81. SIGNAMES = set(sig for sig in dir(signal)
  82. if sig.startswith("SIG") and "_" not in sig)
  83. SIGMAP = dict((getattr(signal, name), name) for name in SIGNAMES)
  84. USAGE = """\
  85. usage: %(prog_name)s start <node1 node2 nodeN|range> [celeryd options]
  86. %(prog_name)s stop <n1 n2 nN|range> [-SIG (default: -TERM)]
  87. %(prog_name)s restart <n1 n2 nN|range> [-SIG] [celeryd options]
  88. %(prog_name)s kill <n1 n2 nN|range>
  89. %(prog_name)s show <n1 n2 nN|range> [celeryd options]
  90. %(prog_name)s get hostname <n1 n2 nN|range> [-qv] [celeryd options]
  91. %(prog_name)s names <n1 n2 nN|range>
  92. %(prog_name)s expand template <n1 n2 nN|range>
  93. %(prog_name)s help
  94. additional options (must appear after command name):
  95. * --quiet: Don't show as much output.
  96. * --verbose: Show more output.
  97. * --no-color: Don't display colors.
  98. """
  99. def main():
  100. sys.exit(MultiTool().execute_from_commandline(sys.argv))
  101. class MultiTool(object):
  102. retcode = 0 # Final exit code.
  103. def __init__(self):
  104. self.commands = {"start": self.start,
  105. "show": self.show,
  106. "stop": self.stop,
  107. "restart": self.restart,
  108. "kill": self.kill,
  109. "names": self.names,
  110. "expand": self.expand,
  111. "get": self.get,
  112. "help": self.help}
  113. def execute_from_commandline(self, argv, cmd="celeryd"):
  114. argv = list(argv) # don't modify callers argv.
  115. # Reserve the --quiet|-q/--verbose options.
  116. self.quiet = False
  117. self.verbose = False
  118. self.no_color = False
  119. if "--quiet" in argv:
  120. self.quiet = argv.pop(argv.index("--quiet"))
  121. if "-q" in argv:
  122. self.quiet = argv.pop(argv.index("-q"))
  123. if "--verbose" in argv:
  124. self.verbose = argv.pop(argv.index("--verbose"))
  125. if "--no-color" in argv:
  126. self.no_color = argv.pop(argv.index("--no-color"))
  127. self.colored = term.colored(enabled=not self.no_color)
  128. self.OK = str(self.colored.green("OK"))
  129. self.FAILED = str(self.colored.red("FAILED"))
  130. self.DOWN = str(self.colored.magenta("DOWN"))
  131. self.prog_name = os.path.basename(argv.pop(0))
  132. if len(argv) == 0 or argv[0][0] == "-":
  133. return self.error()
  134. try:
  135. self.commands[argv[0]](argv[1:], cmd)
  136. except KeyError:
  137. self.error("Invalid command: %s" % argv[0])
  138. return self.retcode
  139. def names(self, argv, cmd):
  140. p = NamespacedOptionParser(argv)
  141. print("\n".join(hostname
  142. for hostname, _, _ in multi_args(p, cmd)))
  143. def get(self, argv, cmd):
  144. wanted = argv[0]
  145. p = NamespacedOptionParser(argv[1:])
  146. for name, worker, _ in multi_args(p, cmd):
  147. if name == wanted:
  148. print(" ".join(worker))
  149. return
  150. def show(self, argv, cmd):
  151. p = NamespacedOptionParser(argv)
  152. self.note("> Starting nodes...")
  153. print("\n".join(" ".join(worker)
  154. for _, worker, _ in multi_args(p, cmd)))
  155. def start(self, argv, cmd):
  156. self.splash()
  157. p = NamespacedOptionParser(argv)
  158. self.with_detacher_default_options(p)
  159. retcodes = []
  160. self.note("> Starting nodes...")
  161. for nodename, argv, _ in multi_args(p, cmd):
  162. self.note("\t> %s: " % (nodename, ), newline=False)
  163. retcode = self.waitexec(argv)
  164. self.note(retcode and self.FAILED or self.OK)
  165. retcodes.append(retcode)
  166. self.retcode = int(any(retcodes))
  167. def with_detacher_default_options(self, p):
  168. p.options.setdefault("--pidfile", "celeryd@%n.pid")
  169. p.options.setdefault("--logfile", "celeryd@%n.log")
  170. p.options.setdefault("--cmd", "-m celery.bin.celeryd_detach")
  171. def signal_node(self, nodename, pid, sig):
  172. try:
  173. os.kill(pid, sig)
  174. except OSError, exc:
  175. if exc.errno != errno.ESRCH:
  176. raise
  177. self.note("Could not signal %s (%s): No such process" % (
  178. nodename, pid))
  179. return False
  180. return True
  181. def node_alive(self, pid):
  182. try:
  183. os.kill(pid, 0)
  184. except OSError, exc:
  185. if exc.errno == errno.ESRCH:
  186. return False
  187. raise
  188. return True
  189. def shutdown_nodes(self, nodes, sig=signal.SIGTERM, retry=None,
  190. callback=None):
  191. if not nodes:
  192. return
  193. P = set(nodes)
  194. def on_down(node):
  195. P.discard(node)
  196. if callback:
  197. callback(*node)
  198. self.note(self.colored.blue("> Stopping nodes..."))
  199. for node in list(P):
  200. if node in P:
  201. nodename, _, pid = node
  202. self.note("\t> %s: %s -> %s" % (nodename,
  203. SIGMAP[sig][3:],
  204. pid))
  205. if not self.signal_node(nodename, pid, sig):
  206. on_down(node)
  207. def note_waiting():
  208. left = len(P)
  209. if left:
  210. self.note(self.colored.blue("> Waiting for %s %s..." % (
  211. left, left > 1 and "nodes" or "node")), newline=False)
  212. if retry:
  213. note_waiting()
  214. its = 0
  215. while P:
  216. for node in P:
  217. its += 1
  218. self.note(".", newline=False)
  219. nodename, _, pid = node
  220. if not self.node_alive(pid):
  221. self.note("\n\t> %s: %s" % (nodename, self.OK))
  222. on_down(node)
  223. note_waiting()
  224. break
  225. if P and not its % len(P):
  226. sleep(float(retry))
  227. self.note("")
  228. def getpids(self, p, cmd, callback=None):
  229. from celery import platforms
  230. pidfile_template = p.options.setdefault("--pidfile", "celeryd@%n.pid")
  231. nodes = []
  232. for nodename, argv, expander in multi_args(p, cmd):
  233. pidfile = expander(pidfile_template)
  234. try:
  235. pid = platforms.PIDFile(pidfile).read_pid()
  236. except ValueError:
  237. pass
  238. if pid:
  239. nodes.append((nodename, tuple(argv), pid))
  240. else:
  241. self.note("> %s: %s" % (nodename, self.DOWN))
  242. if callback:
  243. callback(nodename, argv, pid)
  244. return nodes
  245. def kill(self, argv, cmd):
  246. self.splash()
  247. p = NamespacedOptionParser(argv)
  248. for nodename, _, pid in self.getpids(p, cmd):
  249. self.note("Killing node %s (%s)" % (nodename, pid))
  250. self.signal_node(nodename, pid, signal.SIGKILL)
  251. def stop(self, argv, cmd):
  252. self.splash()
  253. p = NamespacedOptionParser(argv)
  254. return self._stop_nodes(p, cmd)
  255. def _stop_nodes(self, p, cmd, retry=None, callback=None):
  256. restargs = p.args[len(p.values):]
  257. self.shutdown_nodes(self.getpids(p, cmd, callback=callback),
  258. sig=findsig(restargs),
  259. retry=retry,
  260. callback=callback)
  261. def restart(self, argv, cmd):
  262. self.splash()
  263. p = NamespacedOptionParser(argv)
  264. self.with_detacher_default_options(p)
  265. retvals = []
  266. def on_node_shutdown(nodename, argv, pid):
  267. self.note(self.colored.blue(
  268. "> Restarting node %s: " % nodename), newline=False)
  269. retval = self.waitexec(argv)
  270. self.note(retval and self.FAILED or self.OK)
  271. retvals.append(retval)
  272. self._stop_nodes(p, cmd, retry=2, callback=on_node_shutdown)
  273. self.retval = int(any(retvals))
  274. def expand(self, argv, cmd=None):
  275. template = argv[0]
  276. p = NamespacedOptionParser(argv[1:])
  277. for _, _, expander in multi_args(p, cmd):
  278. print(expander(template))
  279. def help(self, argv, cmd=None):
  280. say(__doc__)
  281. def usage(self):
  282. self.splash()
  283. say(USAGE % {"prog_name": self.prog_name})
  284. def splash(self):
  285. c = self.colored
  286. self.note(c.cyan("celeryd-multi v%s" % __version__))
  287. def waitexec(self, argv, path=sys.executable):
  288. argstr = shlex.split(" ".join([path] + list(argv)))
  289. pipe = Popen(argstr)
  290. self.info(" %s" % " ".join(argstr))
  291. retcode = pipe.wait()
  292. if retcode < 0:
  293. self.note("* Child was terminated by signal %s" % (-retcode, ))
  294. return -retcode
  295. elif retcode > 0:
  296. self.note("* Child terminated with failure code %s" % (retcode, ))
  297. return retcode
  298. def error(self, msg=None):
  299. if msg:
  300. say(msg)
  301. self.usage()
  302. self.retcode = 1
  303. return 1
  304. def info(self, msg, newline=True):
  305. if self.verbose:
  306. self.note(msg, newline=newline)
  307. def note(self, msg, newline=True):
  308. if not self.quiet:
  309. say(str(msg), newline=newline)
  310. def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
  311. names = p.values
  312. options = dict(p.options)
  313. ranges = len(names) == 1
  314. if ranges:
  315. try:
  316. noderange = int(names[0])
  317. except ValueError:
  318. pass
  319. else:
  320. names = map(str, range(1, noderange + 1))
  321. prefix = "celery"
  322. cmd = options.pop("--cmd", cmd)
  323. append = options.pop("--append", append)
  324. hostname = options.pop("--hostname",
  325. options.pop("-n", socket.gethostname()))
  326. prefix = options.pop("--prefix", prefix) or ""
  327. suffix = options.pop("--suffix", suffix) or "." + hostname
  328. for ns_name, ns_opts in p.namespaces.items():
  329. if "," in ns_name or (ranges and "-" in ns_name):
  330. for subns in parse_ns_range(ns_name, ranges):
  331. p.namespaces[subns].update(ns_opts)
  332. p.namespaces.pop(ns_name)
  333. for name in names:
  334. this_name = options["-n"] = prefix + name + suffix
  335. expand = abbreviations({"%h": this_name,
  336. "%n": name})
  337. argv = ([expand(cmd)] +
  338. [format_opt(opt, expand(value))
  339. for opt, value in p.optmerge(name, options).items()])
  340. if append:
  341. argv.append(expand(append))
  342. yield this_name, argv, expand
  343. class NamespacedOptionParser(object):
  344. def __init__(self, args):
  345. self.args = args
  346. self.options = {}
  347. self.values = []
  348. self.namespaces = defaultdict(lambda: {})
  349. self.parse()
  350. def parse(self):
  351. rargs = list(self.args)
  352. pos = 0
  353. while pos < len(rargs):
  354. arg = rargs[pos]
  355. if arg[0] == "-":
  356. if arg[1] == "-":
  357. self.process_long_opt(arg[2:])
  358. else:
  359. value = None
  360. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  361. value = rargs[pos + 1]
  362. pos += 1
  363. self.process_short_opt(arg[1:], value)
  364. else:
  365. self.values.append(arg)
  366. pos += 1
  367. def process_long_opt(self, arg, value=None):
  368. if "=" in arg:
  369. arg, value = arg.split("=", 1)
  370. self.add_option(arg, value, short=False)
  371. def process_short_opt(self, arg, value=None):
  372. self.add_option(arg, value, short=True)
  373. def optmerge(self, ns, defaults=None):
  374. if defaults is None:
  375. defaults = self.options
  376. return dict(defaults, **self.namespaces[ns])
  377. def add_option(self, name, value, short=False, ns=None):
  378. prefix = short and "-" or "--"
  379. dest = self.options
  380. if ":" in name:
  381. name, ns = name.split(":")
  382. dest = self.namespaces[ns]
  383. dest[prefix + name] = value
  384. def quote(v):
  385. return "\\'".join("'" + p + "'" for p in v.split("'"))
  386. def format_opt(opt, value):
  387. if not value:
  388. return opt
  389. if opt[0:2] == "--":
  390. return "%s=%s" % (opt, value)
  391. return "%s %s" % (opt, value)
  392. def parse_ns_range(ns, ranges=False):
  393. ret = []
  394. for space in "," in ns and ns.split(",") or [ns]:
  395. if ranges and "-" in space:
  396. start, stop = space.split("-")
  397. x = map(str, range(int(start), int(stop) + 1))
  398. ret.extend(x)
  399. else:
  400. ret.append(space)
  401. return ret
  402. def abbreviations(map):
  403. def expand(S):
  404. ret = S
  405. if S is not None:
  406. for short, long in map.items():
  407. ret = ret.replace(short, long)
  408. return ret
  409. return expand
  410. def say(m, newline=True):
  411. sys.stderr.write(newline and "%s\n" % (m, ) or m)
  412. def findsig(args, default=signal.SIGTERM):
  413. for arg in reversed(args):
  414. if len(arg) == 2 and arg[0] == "-" and arg[1].isdigit():
  415. try:
  416. return int(arg[1])
  417. except ValueError:
  418. pass
  419. if arg[0] == "-":
  420. maybe_sig = "SIG" + arg[1:]
  421. if maybe_sig in SIGNAMES:
  422. return getattr(signal, maybe_sig)
  423. return default
  424. if __name__ == "__main__": # pragma: no cover
  425. main()