celeryd_multi.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import sys
  2. import socket
  3. from celery.utils.compat import defaultdict
  4. EXAMPLES = """
  5. Some examples:
  6. # Advanced example with 10 workers:
  7. # * Three of the workers processes the images and video queue
  8. # * Two of the workers processes the data queue with loglevel DEBUG
  9. # * the rest processes the default' queue.
  10. $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  11. -Q default -L:4,5 DEBUG
  12. # get commands to start 10 workers, with 3 processes each
  13. $ celeryd-multi start 3 -c 3
  14. celeryd -n celeryd1.myhost -c 3
  15. celeryd -n celeryd2.myhost -c 3
  16. celeryd- n celeryd3.myhost -c 3
  17. # start 3 named workers
  18. $ celeryd-multi start image video data -c 3
  19. celeryd -n image.myhost -c 3
  20. celeryd -n video.myhost -c 3
  21. celeryd -n data.myhost -c 3
  22. # specify custom hostname
  23. $ celeryd-multi start 2 -n worker.example.com -c 3
  24. celeryd -n celeryd1.worker.example.com -c 3
  25. celeryd -n celeryd2.worker.example.com -c 3
  26. # Additionl options are added to each celeryd',
  27. # but you can also modify the options for ranges of or single workers
  28. # 3 workers: Two with 3 processes, and one with 10 processes.
  29. $ celeryd-multi start 3 -c 3 -c:1 10
  30. celeryd -n celeryd1.myhost -c 10
  31. celeryd -n celeryd2.myhost -c 3
  32. celeryd -n celeryd3.myhost -c 3
  33. # can also specify options for named workers
  34. $ celeryd-multi start image video data -c 3 -c:image 10
  35. celeryd -n image.myhost -c 10
  36. celeryd -n video.myhost -c 3
  37. celeryd -n data.myhost -c 3
  38. # ranges and lists of workers in options is also allowed:
  39. # (-c:1-3 can also be written as -c:1,2,3)
  40. $ celeryd-multi start 5 -c 3 -c:1-3 10
  41. celeryd -n celeryd1.myhost -c 10
  42. celeryd -n celeryd2.myhost -c 10
  43. celeryd -n celeryd3.myhost -c 10
  44. celeryd -n celeryd4.myhost -c 3
  45. celeryd -n celeryd5.myhost -c 3
  46. # lists also works with named workers
  47. $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  48. celeryd -n foo.myhost -c 10
  49. celeryd -n bar.myhost -c 10
  50. celeryd -n baz.myhost -c 10
  51. celeryd -n xuzzy.myhost -c 3
  52. """
  53. class NamespacedOptionParser(object):
  54. def __init__(self, args):
  55. self.args = args
  56. self.options = {}
  57. self.values = []
  58. self.namespaces = defaultdict(lambda: {})
  59. self.parse()
  60. def parse(self):
  61. rargs = list(self.args)
  62. pos = 0
  63. while pos < len(rargs):
  64. arg = rargs[pos]
  65. if arg[0] == "-":
  66. if arg[1] == "-":
  67. self.process_long_opt(arg[2:])
  68. else:
  69. value = None
  70. if len(rargs) > pos + 1 and rargs[pos + 1][0] != '-':
  71. value = rargs[pos + 1]
  72. pos += 1
  73. self.process_short_opt(arg[1:], value)
  74. else:
  75. self.values.append(arg)
  76. pos += 1
  77. def process_long_opt(self, arg, value=None):
  78. if "=" in arg:
  79. arg, value = arg.split("=", 1)
  80. self.add_option(arg, value, short=False)
  81. def process_short_opt(self, arg, value=None):
  82. self.add_option(arg, value, short=True)
  83. def optmerge(self, ns, defaults=None):
  84. if defaults is None:
  85. defaults = self.options
  86. return dict(defaults, **self.namespaces[ns])
  87. def add_option(self, name, value, short=False, ns=None):
  88. prefix = short and "-" or "--"
  89. dest = self.options
  90. if ":" in name:
  91. name, ns = name.split(":")
  92. dest = self.namespaces[ns]
  93. dest[prefix + name] = value
  94. def quote(v):
  95. return "\\'".join("'" + p + "'" for p in v.split("'"))
  96. def format_opt(opt, value):
  97. if not value:
  98. return opt
  99. if opt[0:2] == "--":
  100. return "%s=%s" % (opt, value)
  101. return "%s %s" % (opt, value)
  102. def parse_ns_range(ns, ranges=False):
  103. ret = []
  104. for space in "," in ns and ns.split(",") or [ns]:
  105. if ranges and "-" in space:
  106. start, stop = space.split("-")
  107. x = map(str, range(int(start), int(stop) + 1))
  108. ret.extend(x)
  109. else:
  110. ret.append(space)
  111. return ret
  112. def abbreviations(map):
  113. def expand(S):
  114. ret = S
  115. if S is not None:
  116. for short, long in map.items():
  117. ret = ret.replace(short, long)
  118. return ret
  119. return expand
  120. def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
  121. names = p.values
  122. options = dict(p.options)
  123. ranges = len(names) == 1
  124. if ranges:
  125. names = map(str, range(1, int(names[0]) + 1))
  126. prefix = "celery"
  127. cmd = options.pop("--cmd", cmd)
  128. append = options.pop("--append", append)
  129. hostname = options.pop("--hostname",
  130. options.pop("-n", socket.gethostname()))
  131. prefix = options.pop("--prefix", prefix) or ""
  132. suffix = options.pop("--suffix", suffix) or "." + hostname
  133. for ns_name, ns_opts in p.namespaces.items():
  134. if "," in ns_name or (ranges and "-" in ns_name):
  135. for subns in parse_ns_range(ns_name, ranges):
  136. p.namespaces[subns].update(ns_opts)
  137. p.namespaces.pop(ns_name)
  138. for name in names:
  139. this_name = options["-n"] = prefix + name + suffix
  140. expand = abbreviations({"%h": this_name,
  141. "%n": name})
  142. line = expand(cmd) + " " + " ".join(
  143. format_opt(opt, expand(value))
  144. for opt, value in p.optmerge(name, options).items())
  145. if append:
  146. line += " %s" % expand(append)
  147. yield this_name, line, expand
  148. def say(m):
  149. sys.stderr.write("%s\n" % (m, ))
  150. class MultiTool(object):
  151. def __init__(self):
  152. self.commands = {"start": self.start,
  153. "names": self.names,
  154. "expand": self.expand,
  155. "get": self.get,
  156. "help": self.help}
  157. def __call__(self, argv, cmd="celeryd"):
  158. if len(argv) == 0:
  159. self.usage()
  160. sys.exit(0)
  161. try:
  162. return self.commands[argv[0]](argv[1:], cmd)
  163. except KeyError:
  164. say("Invalid command: %s" % argv[0])
  165. self.usage()
  166. sys.exit(1)
  167. def names(self, argv, cmd):
  168. p = NamespacedOptionParser(argv)
  169. print("\n".join(hostname
  170. for hostname, _, _ in multi_args(p, cmd)))
  171. def get(self, argv, cmd):
  172. wanted = argv[0]
  173. p = NamespacedOptionParser(argv[1:])
  174. for name, worker, _ in multi_args(p, cmd):
  175. if name == wanted:
  176. print(worker)
  177. return
  178. def start(self, argv, cmd):
  179. p = NamespacedOptionParser(argv)
  180. print("\n".join(worker
  181. for _, worker, _ in multi_args(p, cmd)))
  182. def expand(self, argv, cmd=None):
  183. template = argv[0]
  184. p = NamespacedOptionParser(argv[1:])
  185. for _, _, expander in multi_args(p, cmd):
  186. print(expander(template))
  187. def help(self, argv, cmd=None):
  188. say(EXAMPLES)
  189. def usage(self):
  190. say("Please use one of the following commands: %s" % (
  191. ", ".join(self.commands.keys())))
  192. def main():
  193. MultiTool()(sys.argv[1:])
  194. if __name__ == "__main__": # pragma: no cover
  195. main()