base.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. # -*- coding: utf-8 -*-
  2. """
  3. .. _preload-options:
  4. Preload Options
  5. ---------------
  6. .. cmdoption:: -A, --app
  7. app instance to use (e.g. module.attr_name)
  8. .. cmdoption:: -b, --broker
  9. url to broker. default is 'amqp://guest@localhost//'
  10. .. cmdoption:: --loader
  11. name of custom loader class to use.
  12. .. cmdoption:: --config
  13. name of the configuration module (default: `celeryconfig`)
  14. .. _daemon-options:
  15. Daemon Options
  16. --------------
  17. .. cmdoption:: -f, --logfile
  18. Path to log file. If no logfile is specified, `stderr` is used.
  19. .. cmdoption:: --pidfile
  20. Optional file used to store the process pid.
  21. The program will not start if this file already exists
  22. and the pid is still alive.
  23. .. cmdoption:: --uid
  24. User id, or user name of the user to run as after detaching.
  25. .. cmdoption:: --gid
  26. Group id, or group name of the main group to change to after
  27. detaching.
  28. .. cmdoption:: --umask
  29. Effective umask of the process after detaching. Default is 0.
  30. .. cmdoption:: --workdir
  31. Optional directory to change to after detaching.
  32. """
  33. from __future__ import absolute_import, print_function
  34. import os
  35. import re
  36. import socket
  37. import sys
  38. import warnings
  39. from collections import defaultdict
  40. from future_builtins import zip
  41. from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
  42. from types import ModuleType
  43. import celery
  44. from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
  45. from celery.platforms import EX_FAILURE, EX_USAGE
  46. from celery.utils import text
  47. from celery.utils.imports import symbol_by_name, import_from_cwd
  48. # always enable DeprecationWarnings, so our users can see them.
  49. for warning in (CDeprecationWarning, CPendingDeprecationWarning):
  50. warnings.simplefilter('once', warning, 0)
  51. ARGV_DISABLED = """
  52. Unrecognized command line arguments: {0}
  53. Try --help?
  54. """
  55. find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
  56. find_rst_ref = re.compile(r':\w+:`(.+?)`')
  57. find_sformat = re.compile(r'%(\w)')
  58. class HelpFormatter(IndentedHelpFormatter):
  59. def format_epilog(self, epilog):
  60. if epilog:
  61. return '\n{0}\n\n'.format(epilog)
  62. return ''
  63. def format_description(self, description):
  64. return text.ensure_2lines(text.fill_paragraphs(
  65. text.dedent(description), self.width))
  66. class Command(object):
  67. """Base class for command line applications.
  68. :keyword app: The current app.
  69. :keyword get_app: Callable returning the current app if no app provided.
  70. """
  71. Parser = OptionParser
  72. #: Arg list used in help.
  73. args = ''
  74. #: Application version.
  75. version = celery.VERSION_BANNER
  76. #: If false the parser will raise an exception if positional
  77. #: args are provided.
  78. supports_args = True
  79. #: List of options (without preload options).
  80. option_list = ()
  81. # module Rst documentation to parse help from (if any)
  82. doc = None
  83. #: List of options to parse before parsing other options.
  84. preload_options = (
  85. Option('-A', '--app', default=None),
  86. Option('-b', '--broker', default=None),
  87. Option('--loader', default=None),
  88. Option('--config', default='celeryconfig', dest='config_module'),
  89. )
  90. #: Enable if the application should support config from the cmdline.
  91. enable_config_from_cmdline = False
  92. #: Default configuration namespace.
  93. namespace = 'celery'
  94. #: Text to print at end of --help
  95. epilog = None
  96. #: Text to print in --help before option list.
  97. description = ''
  98. #: Set to true if this command doesn't have subcommands
  99. leaf = True
  100. def __init__(self, app=None, get_app=None):
  101. self.app = app
  102. self.get_app = get_app or self._get_default_app
  103. def run(self, *args, **options):
  104. """This is the body of the command called by :meth:`handle_argv`."""
  105. raise NotImplementedError('subclass responsibility')
  106. def execute_from_commandline(self, argv=None):
  107. """Execute application from command line.
  108. :keyword argv: The list of command line arguments.
  109. Defaults to ``sys.argv``.
  110. """
  111. if argv is None:
  112. argv = list(sys.argv)
  113. # Should we load any special concurrency environment?
  114. pool_option = self.with_pool_option(argv)
  115. if pool_option:
  116. self.maybe_patch_concurrency(argv, *pool_option)
  117. self.on_concurrency_setup()
  118. # Dump version and exit if '--version' arg set.
  119. self.early_version(argv)
  120. argv = self.setup_app_from_commandline(argv)
  121. prog_name = os.path.basename(argv[0])
  122. return self.handle_argv(prog_name, argv[1:])
  123. def _find_option_with_arg(self, argv, short_opts=None, long_opts=None):
  124. for i, arg in enumerate(argv):
  125. if arg.startswith('-'):
  126. if long_opts and arg.startswith('--'):
  127. name, _, val = arg.partition('=')
  128. if name in long_opts:
  129. return val
  130. if short_opts and arg in short_opts:
  131. return argv[i + 1]
  132. raise KeyError('|'.join(short_opts or [] + long_opts or []))
  133. def maybe_patch_concurrency(self, argv, short_opts=None, long_opts=None):
  134. try:
  135. pool = self._find_option_with_arg(argv, short_opts, long_opts)
  136. except KeyError:
  137. pass
  138. else:
  139. from celery import concurrency
  140. # set up eventlet/gevent environments ASAP.
  141. concurrency.get_implementation(pool)
  142. def on_concurrency_setup(self):
  143. pass
  144. def usage(self, command):
  145. """Returns the command-line usage string for this app."""
  146. return '%%prog [options] {0.args}'.format(self)
  147. def get_options(self):
  148. """Get supported command line options."""
  149. return self.option_list
  150. def expanduser(self, value):
  151. if isinstance(value, basestring):
  152. return os.path.expanduser(value)
  153. return value
  154. def handle_argv(self, prog_name, argv):
  155. """Parses command line arguments from ``argv`` and dispatches
  156. to :meth:`run`.
  157. :param prog_name: The program name (``argv[0]``).
  158. :param argv: Command arguments.
  159. Exits with an error message if :attr:`supports_args` is disabled
  160. and ``argv`` contains positional arguments.
  161. """
  162. options, args = self.prepare_args(*self.parse_options(prog_name, argv))
  163. return self.run(*args, **options)
  164. def prepare_args(self, options, args):
  165. if options:
  166. options = dict((k, self.expanduser(v))
  167. for k, v in vars(options).iteritems()
  168. if not k.startswith('_'))
  169. args = [self.expanduser(arg) for arg in args]
  170. self.check_args(args)
  171. return options, args
  172. def check_args(self, args):
  173. if not self.supports_args and args:
  174. self.die(ARGV_DISABLED.format(', '.join(args)), EX_USAGE)
  175. def die(self, msg, status=EX_FAILURE):
  176. print(msg, file=sys.stderr)
  177. sys.exit(status)
  178. def early_version(self, argv):
  179. if '--version' in argv:
  180. print(self.version)
  181. sys.exit(0)
  182. def parse_options(self, prog_name, arguments):
  183. """Parse the available options."""
  184. # Don't want to load configuration to just print the version,
  185. # so we handle --version manually here.
  186. parser = self.create_parser(prog_name)
  187. return parser.parse_args(arguments)
  188. def create_parser(self, prog_name, command=None):
  189. return self.prepare_parser(self.Parser(prog=prog_name,
  190. usage=self.usage(command),
  191. version=self.version,
  192. epilog=self.epilog,
  193. formatter=HelpFormatter(),
  194. description=self.description,
  195. option_list=(self.preload_options +
  196. self.get_options())))
  197. def prepare_parser(self, parser):
  198. docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]
  199. for doc in docs:
  200. for long_opt, help in doc.iteritems():
  201. option = parser.get_option(long_opt)
  202. if option is not None:
  203. option.help = ' '.join(help).format(default=option.default)
  204. return parser
  205. def prepare_preload_options(self, options):
  206. """Optional handler to do additional processing of preload options.
  207. Configuration must not have been initialized
  208. until after this is called.
  209. """
  210. pass
  211. def setup_app_from_commandline(self, argv):
  212. preload_options = self.parse_preload_options(argv)
  213. self.prepare_preload_options(preload_options)
  214. app = (preload_options.get('app') or
  215. os.environ.get('CELERY_APP') or
  216. self.app)
  217. loader = (preload_options.get('loader') or
  218. os.environ.get('CELERY_LOADER') or
  219. 'default')
  220. broker = preload_options.get('broker', None)
  221. if broker:
  222. os.environ['CELERY_BROKER_URL'] = broker
  223. config_module = preload_options.get('config_module')
  224. if config_module:
  225. os.environ['CELERY_CONFIG_MODULE'] = config_module
  226. if app:
  227. self.app = self.find_app(app)
  228. else:
  229. self.app = self.get_app(loader=loader)
  230. if self.enable_config_from_cmdline:
  231. argv = self.process_cmdline_config(argv)
  232. return argv
  233. def find_app(self, app):
  234. sym = self.symbol_by_name(app)
  235. if isinstance(sym, ModuleType):
  236. if getattr(sym, '__path__', None):
  237. return self.find_app('{0}.celery:'.format(
  238. app.replace(':', '')))
  239. return sym.celery
  240. return sym
  241. def symbol_by_name(self, name):
  242. return symbol_by_name(name, imp=import_from_cwd)
  243. get_cls_by_name = symbol_by_name # XXX compat
  244. def process_cmdline_config(self, argv):
  245. try:
  246. cargs_start = argv.index('--')
  247. except ValueError:
  248. return argv
  249. argv, cargs = argv[:cargs_start], argv[cargs_start + 1:]
  250. self.app.config_from_cmdline(cargs, namespace=self.namespace)
  251. return argv
  252. def parse_preload_options(self, args):
  253. acc = {}
  254. opts = {}
  255. for opt in self.preload_options:
  256. for t in (opt._long_opts, opt._short_opts):
  257. opts.update(dict(zip(t, [opt.dest] * len(t))))
  258. index = 0
  259. length = len(args)
  260. while index < length:
  261. arg = args[index]
  262. if arg.startswith('--') and '=' in arg:
  263. key, value = arg.split('=', 1)
  264. dest = opts.get(key)
  265. if dest:
  266. acc[dest] = value
  267. elif arg.startswith('-'):
  268. dest = opts.get(arg)
  269. if dest:
  270. acc[dest] = args[index + 1]
  271. index += 1
  272. index += 1
  273. return acc
  274. def parse_doc(self, doc):
  275. options, in_option = defaultdict(list), None
  276. for line in doc.splitlines():
  277. if line.startswith('.. cmdoption::'):
  278. m = find_long_opt.match(line)
  279. if m:
  280. in_option = m.groups()[0].strip()
  281. assert in_option, 'missing long opt'
  282. elif in_option and line.startswith(' ' * 4):
  283. options[in_option].append(find_rst_ref.sub(r'\1',
  284. line.strip()).replace('`', ''))
  285. return options
  286. def with_pool_option(self, argv):
  287. """Returns tuple of ``(short_opts, long_opts)`` if the command
  288. supports a pool argument, and used to monkey patch eventlet/gevent
  289. environments as early as possible.
  290. E.g::
  291. has_pool_option = (['-P'], ['--pool'])
  292. """
  293. pass
  294. def simple_format(self, s, match=find_sformat, expand=r'\1', **keys):
  295. host = socket.gethostname()
  296. name, _, domain = host.partition('.')
  297. keys = dict({'%': '%', 'h': host, 'n': name, 'd': domain}, **keys)
  298. return match.sub(lambda m: keys[m.expand(expand)], s)
  299. def _get_default_app(self, *args, **kwargs):
  300. from celery.app import default_app
  301. return default_app._get_current_object() # omit proxy
  302. def daemon_options(default_pidfile=None, default_logfile=None):
  303. return (
  304. Option('-f', '--logfile', default=default_logfile),
  305. Option('--pidfile', default=default_pidfile),
  306. Option('--uid', default=None),
  307. Option('--gid', default=None),
  308. Option('--umask', default=0, type='int'),
  309. Option('--workdir', default=None, dest='working_directory'),
  310. )