celeryd_multi.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import sys
  2. import shlex
  3. import socket
  4. from celery.utils.compat import defaultdict
  5. from carrot.utils import rpartition
  6. class OptionParser(object):
  7. def __init__(self, args):
  8. self.args = args
  9. self.options = {}
  10. self.values = []
  11. self.parse()
  12. def parse(self):
  13. rargs = list(self.args)
  14. pos = 0
  15. while pos < len(rargs):
  16. arg = rargs[pos]
  17. if arg[0] == "-":
  18. if arg[1] == "-":
  19. self.process_long_opt(arg[2:])
  20. else:
  21. value = None
  22. if rargs[pos + 1][0] != '-':
  23. value = rargs[pos + 1]
  24. pos += 1
  25. self.process_short_opt(arg[1:], value)
  26. else:
  27. self.values.append(arg)
  28. pos += 1
  29. def process_long_opt(self, arg, value=None):
  30. if "=" in arg:
  31. arg, value = arg.split("=", 1)
  32. self.add_option(arg, value, short=False)
  33. def process_short_opt(self, arg, value=None):
  34. self.add_option(arg, value, short=True)
  35. def set_option(self, arg, value, short=False):
  36. prefix = short and "-" or "--"
  37. self.options[prefix + arg] = value
  38. class NamespacedOptionParser(OptionParser):
  39. def __init__(self, args):
  40. self.namespaces = defaultdict(lambda: {})
  41. super(NamespacedOptionParser, self).__init__(args)
  42. def add_option(self, name, value, short=False, ns=None):
  43. prefix = short and "-" or "--"
  44. dest = self.options
  45. if ":" in name:
  46. name, ns = name.split(":")
  47. dest = self.namespaces[ns]
  48. dest[prefix + name] = value
  49. def optmerge(self, ns, defaults=None):
  50. if defaults is None:
  51. defaults = self.options
  52. return dict(defaults, **self.namespaces[ns])
  53. def quote(v):
  54. return "\\'".join("'" + p + "'" for p in v.split("'"))
  55. def format_opt(opt, value):
  56. if not value:
  57. return opt
  58. if opt[0:2] == "--":
  59. return "%s=%s" % (opt, value)
  60. return "%s %s" % (opt, value)
  61. def parse_ns_range(ns, ranges=False):
  62. ret = []
  63. for space in "," in ns and ns.split(",") or [ns]:
  64. if ranges and "-" in space:
  65. start, stop = space.split("-")
  66. x = map(str, range(int(start), int(stop) + 1))
  67. ret.extend(x)
  68. else:
  69. ret.append(space)
  70. return ret
  71. def abbreviations(map):
  72. def expand(S):
  73. ret = S
  74. for short, long in map.items():
  75. ret = ret.replace(short, long)
  76. return ret
  77. return expand
  78. def multi_args(p, cmd="celeryd", append="", prefix="", suffix=""):
  79. names = p.values
  80. options = dict(p.options)
  81. ranges = len(names) == 1
  82. if ranges:
  83. names = map(str, range(1, int(names[0]) + 1))
  84. prefix = "celery"
  85. cmd = options.pop("--cmd", cmd)
  86. append = options.pop("--append", append)
  87. hostname = options.pop("--hostname",
  88. options.pop("-n", socket.gethostname()))
  89. prefix = options.pop("--prefix", prefix) or ""
  90. suffix = options.pop("--suffix", suffix) or "." + hostname
  91. for ns_name, ns_opts in p.namespaces.items():
  92. if "," in ns_name or (ranges and "-" in ns_name):
  93. for subns in parse_ns_range(ns_name, ranges):
  94. p.namespaces[subns].update(ns_opts)
  95. p.namespaces.pop(ns_name)
  96. for name in names:
  97. this_name = options["-n"] = prefix + name + suffix
  98. expand = abbreviations({"%h": this_name,
  99. "%n": name})
  100. line = expand(cmd) + " " + " ".join(
  101. format_opt(opt, expand(value))
  102. for opt, value in p.optmerge(name, options).items()) + \
  103. " " + expand(append)
  104. yield this_name, line, expand
  105. def names(argv, cmd):
  106. p = NamespacedOptionParser(argv)
  107. print("\n".join(hostname
  108. for hostname, _, _ in multi_args(p, cmd)))
  109. def get(argv, cmd):
  110. wanted = argv[0]
  111. p = NamespacedOptionParser(argv[1:])
  112. for name, worker, _ in multi_args(p, cmd):
  113. if name == wanted:
  114. print(worker)
  115. return
  116. def start(argv, cmd):
  117. p = NamespacedOptionParser(argv)
  118. print("\n".join(worker
  119. for _, worker, _ in multi_args(p, cmd)))
  120. def expand(argv, cmd=None):
  121. template = argv[0]
  122. p = NamespacedOptionParser(argv[1:])
  123. for _, _, expander in multi_args(p, cmd):
  124. print(expander(template))
  125. def help(argv, cmd=None):
  126. print("""Some examples:
  127. # Advanced example with 10 workers:
  128. # * Three of the workers processes the images and video queue
  129. # * Two of the workers processes the data queue with loglevel DEBUG
  130. # * the rest processes the default' queue.
  131. $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data
  132. -Q default -L:4,5 DEBUG
  133. # get commands to start 10 workers, with 3 processes each
  134. $ celeryd-multi start 3 -c 3
  135. celeryd -n celeryd1.myhost -c 3
  136. celeryd -n celeryd2.myhost -c 3
  137. celeryd- n celeryd3.myhost -c 3
  138. # start 3 named workers
  139. $ celeryd-multi start image video data -c 3
  140. celeryd -n image.myhost -c 3
  141. celeryd -n video.myhost -c 3
  142. celeryd -n data.myhost -c 3
  143. # specify custom hostname
  144. $ celeryd-multi start 2 -n worker.example.com -c 3
  145. celeryd -n celeryd1.worker.example.com -c 3
  146. celeryd -n celeryd2.worker.example.com -c 3
  147. # Additionl options are added to each celeryd',
  148. # but you can also modify the options for ranges of or single workers
  149. # 3 workers: Two with 3 processes, and one with 10 processes.
  150. $ celeryd-multi start 3 -c 3 -c:1 10
  151. celeryd -n celeryd1.myhost -c 10
  152. celeryd -n celeryd2.myhost -c 3
  153. celeryd -n celeryd3.myhost -c 3
  154. # can also specify options for named workers
  155. $ celeryd-multi start image video data -c 3 -c:image 10
  156. celeryd -n image.myhost -c 10
  157. celeryd -n video.myhost -c 3
  158. celeryd -n data.myhost -c 3
  159. # ranges and lists of workers in options is also allowed:
  160. # (-c:1-3 can also be written as -c:1,2,3)
  161. $ celeryd-multi start 5 -c 3 -c:1-3 10
  162. celeryd -n celeryd1.myhost -c 10
  163. celeryd -n celeryd2.myhost -c 10
  164. celeryd -n celeryd3.myhost -c 10
  165. celeryd -n celeryd4.myhost -c 3
  166. celeryd -n celeryd5.myhost -c 3
  167. # lists also works with named workers
  168. $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
  169. celeryd -n foo.myhost -c 10
  170. celeryd -n bar.myhost -c 10
  171. celeryd -n baz.myhost -c 10
  172. celeryd -n xuzzy.myhost -c 3
  173. """)
  174. COMMANDS = {"start": start,
  175. "names": names,
  176. "expand": expand,
  177. "get": get,
  178. "help": help}
  179. def usage():
  180. print("Please use one of the following commands: %s" % ", ".join(COMMANDS.keys()))
  181. def celeryd_multi(argv, cmd="celeryd"):
  182. if len(argv) == 0:
  183. usage()
  184. sys.exit(0)
  185. try:
  186. return COMMANDS[argv[0]](argv[1:], cmd)
  187. except KeyError, e:
  188. print("Invalid command: %s" % argv[0])
  189. usage()
  190. sys.exit(1)
  191. def main():
  192. celeryd_multi(sys.argv[1:])
  193. if __name__ == "__main__":
  194. main()