base.py 12 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. .. _preload-options:
  4. Preload Options
  5. ---------------
  6. .. cmdoption:: -A, --app
  7. app instance to use (e.g. module.attr_name)
  8. .. cmdoption:: -b, --broker
  9. url to broker. default is 'amqp://guest@localhost//'
  10. .. cmdoption:: --loader
  11. name of custom loader class to use.
  12. .. cmdoption:: --config
  13. Name of the configuration module
  14. .. _daemon-options:
  15. Daemon Options
  16. --------------
  17. .. cmdoption:: -f, --logfile
  18. Path to log file. If no logfile is specified, `stderr` is used.
  19. .. cmdoption:: --pidfile
  20. Optional file used to store the process pid.
  21. The program will not start if this file already exists
  22. and the pid is still alive.
  23. .. cmdoption:: --uid
  24. User id, or user name of the user to run as after detaching.
  25. .. cmdoption:: --gid
  26. Group id, or group name of the main group to change to after
  27. detaching.
  28. .. cmdoption:: --umask
  29. Effective umask of the process after detaching. Default is 0.
  30. .. cmdoption:: --workdir
  31. Optional directory to change to after detaching.
  32. """
  33. from __future__ import absolute_import
  34. import os
  35. import re
  36. import sys
  37. import warnings
  38. from collections import defaultdict
  39. from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
  40. from types import ModuleType
  41. import celery
  42. from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
  43. from celery.platforms import EX_FAILURE, EX_USAGE, maybe_patch_concurrency
  44. from celery.utils import text
  45. from celery.utils.imports import symbol_by_name, import_from_cwd
  46. # always enable DeprecationWarnings, so our users can see them.
  47. for warning in (CDeprecationWarning, CPendingDeprecationWarning):
  48. warnings.simplefilter('once', warning, 0)
  49. ARGV_DISABLED = """
  50. Unrecognized command line arguments: %s
  51. Try --help?
  52. """
  53. find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
  54. find_rst_ref = re.compile(r':\w+:`(.+?)`')
  55. class HelpFormatter(IndentedHelpFormatter):
  56. def format_epilog(self, epilog):
  57. if epilog:
  58. return '\n%s\n\n' % epilog
  59. return ''
  60. def format_description(self, description):
  61. return text.ensure_2lines(text.fill_paragraphs(
  62. text.dedent(description), self.width))
  63. class Command(object):
  64. """Base class for command line applications.
  65. :keyword app: The current app.
  66. :keyword get_app: Callable returning the current app if no app provided.
  67. """
  68. Parser = OptionParser
  69. #: Arg list used in help.
  70. args = ''
  71. #: Application version.
  72. version = celery.VERSION_BANNER
  73. #: If false the parser will raise an exception if positional
  74. #: args are provided.
  75. supports_args = True
  76. #: List of options (without preload options).
  77. option_list = ()
  78. # module Rst documentation to parse help from (if any)
  79. doc = None
  80. # Some programs (multi) does not want to load the app specified
  81. # (Issue #1008).
  82. respects_app_option = True
  83. #: List of options to parse before parsing other options.
  84. preload_options = (
  85. Option('-A', '--app', default=None),
  86. Option('-b', '--broker', default=None),
  87. Option('--loader', default=None),
  88. Option('--config', default=None),
  89. )
  90. #: Enable if the application should support config from the cmdline.
  91. enable_config_from_cmdline = False
  92. #: Default configuration namespace.
  93. namespace = 'celery'
  94. #: Text to print at end of --help
  95. epilog = None
  96. #: Text to print in --help before option list.
  97. description = ''
  98. #: Set to true if this command doesn't have subcommands
  99. leaf = True
  100. def __init__(self, app=None, get_app=None):
  101. self.app = app
  102. self.get_app = get_app or self._get_default_app
  103. def run(self, *args, **options):
  104. """This is the body of the command called by :meth:`handle_argv`."""
  105. raise NotImplementedError('subclass responsibility')
  106. def execute_from_commandline(self, argv=None):
  107. """Execute application from command line.
  108. :keyword argv: The list of command line arguments.
  109. Defaults to ``sys.argv``.
  110. """
  111. if argv is None:
  112. argv = list(sys.argv)
  113. # Should we load any special concurrency environment?
  114. self.maybe_patch_concurrency(argv)
  115. self.on_concurrency_setup()
  116. # Dump version and exit if '--version' arg set.
  117. self.early_version(argv)
  118. argv = self.setup_app_from_commandline(argv)
  119. prog_name = os.path.basename(argv[0])
  120. return self.handle_argv(prog_name, argv[1:])
  121. def run_from_argv(self, prog_name, argv=None):
  122. return self.handle_argv(prog_name, sys.argv if argv is None else argv)
  123. def maybe_patch_concurrency(self, argv=None):
  124. argv = argv or sys.argv
  125. pool_option = self.with_pool_option(argv)
  126. if pool_option:
  127. maybe_patch_concurrency(argv, *pool_option)
  128. short_opts, long_opts = pool_option
  129. def on_concurrency_setup(self):
  130. pass
  131. def usage(self, command):
  132. """Returns the command-line usage string for this app."""
  133. return '%%prog [options] %s' % (self.args, )
  134. def get_options(self):
  135. """Get supported command line options."""
  136. return self.option_list
  137. def expanduser(self, value):
  138. if isinstance(value, basestring):
  139. return os.path.expanduser(value)
  140. return value
  141. def handle_argv(self, prog_name, argv):
  142. """Parses command line arguments from ``argv`` and dispatches
  143. to :meth:`run`.
  144. :param prog_name: The program name (``argv[0]``).
  145. :param argv: Command arguments.
  146. Exits with an error message if :attr:`supports_args` is disabled
  147. and ``argv`` contains positional arguments.
  148. """
  149. options, args = self.prepare_args(*self.parse_options(prog_name, argv))
  150. return self.run(*args, **options)
  151. def prepare_args(self, options, args):
  152. if options:
  153. options = dict((k, self.expanduser(v))
  154. for k, v in vars(options).iteritems()
  155. if not k.startswith('_'))
  156. args = map(self.expanduser, args)
  157. self.check_args(args)
  158. return options, args
  159. def check_args(self, args):
  160. if not self.supports_args and args:
  161. self.die(ARGV_DISABLED % (', '.join(args, )), EX_USAGE)
  162. def die(self, msg, status=EX_FAILURE):
  163. sys.stderr.write(msg + '\n')
  164. sys.exit(status)
  165. def early_version(self, argv):
  166. if '--version' in argv:
  167. sys.stdout.write('%s\n' % self.version)
  168. sys.exit(0)
  169. def parse_options(self, prog_name, arguments):
  170. """Parse the available options."""
  171. # Don't want to load configuration to just print the version,
  172. # so we handle --version manually here.
  173. parser = self.create_parser(prog_name)
  174. return parser.parse_args(arguments)
  175. def create_parser(self, prog_name, command=None):
  176. return self.prepare_parser(self.Parser(prog=prog_name,
  177. usage=self.usage(command),
  178. version=self.version,
  179. epilog=self.epilog,
  180. formatter=HelpFormatter(),
  181. description=self.description,
  182. option_list=(self.preload_options +
  183. self.get_options())))
  184. def prepare_parser(self, parser):
  185. docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]
  186. for doc in docs:
  187. for long_opt, help in doc.iteritems():
  188. option = parser.get_option(long_opt)
  189. if option is not None:
  190. option.help = ' '.join(help) % {'default': option.default}
  191. return parser
  192. def prepare_preload_options(self, options):
  193. """Optional handler to do additional processing of preload options.
  194. Configuration must not have been initialized
  195. until after this is called.
  196. """
  197. pass
  198. def setup_app_from_commandline(self, argv):
  199. preload_options = self.parse_preload_options(argv)
  200. self.prepare_preload_options(preload_options)
  201. app = (preload_options.get('app') or
  202. os.environ.get('CELERY_APP') or
  203. self.app)
  204. loader = (preload_options.get('loader') or
  205. os.environ.get('CELERY_LOADER') or
  206. 'default')
  207. broker = preload_options.get('broker', None)
  208. if broker:
  209. os.environ['CELERY_BROKER_URL'] = broker
  210. config = preload_options.get('config')
  211. if config:
  212. os.environ['CELERY_CONFIG_MODULE'] = config
  213. if self.respects_app_option:
  214. if app and self.respects_app_option:
  215. self.app = self.find_app(app)
  216. elif self.app is None:
  217. self.app = self.get_app(loader=loader)
  218. if self.enable_config_from_cmdline:
  219. argv = self.process_cmdline_config(argv)
  220. else:
  221. self.app = celery.Celery()
  222. return argv
  223. def find_app(self, app):
  224. try:
  225. sym = self.symbol_by_name(app)
  226. except AttributeError:
  227. # last part was not an attribute, but a module
  228. sym = import_from_cwd(app)
  229. if isinstance(sym, ModuleType):
  230. if getattr(sym, '__path__', None):
  231. return self.find_app('%s.celery:' % (app.replace(':', ''), ))
  232. return sym.celery
  233. return sym
  234. def symbol_by_name(self, name):
  235. return symbol_by_name(name, imp=import_from_cwd)
  236. get_cls_by_name = symbol_by_name # XXX compat
  237. def process_cmdline_config(self, argv):
  238. try:
  239. cargs_start = argv.index('--')
  240. except ValueError:
  241. return argv
  242. argv, cargs = argv[:cargs_start], argv[cargs_start + 1:]
  243. self.app.config_from_cmdline(cargs, namespace=self.namespace)
  244. return argv
  245. def parse_preload_options(self, args):
  246. acc = {}
  247. opts = {}
  248. for opt in self.preload_options:
  249. for t in (opt._long_opts, opt._short_opts):
  250. opts.update(dict(zip(t, [opt.dest] * len(t))))
  251. index = 0
  252. length = len(args)
  253. while index < length:
  254. arg = args[index]
  255. if arg.startswith('--') and '=' in arg:
  256. key, value = arg.split('=', 1)
  257. dest = opts.get(key)
  258. if dest:
  259. acc[dest] = value
  260. elif arg.startswith('-'):
  261. dest = opts.get(arg)
  262. if dest:
  263. acc[dest] = args[index + 1]
  264. index += 1
  265. index += 1
  266. return acc
  267. def parse_doc(self, doc):
  268. options, in_option = defaultdict(list), None
  269. for line in doc.splitlines():
  270. if line.startswith('.. cmdoption::'):
  271. m = find_long_opt.match(line)
  272. if m:
  273. in_option = m.groups()[0].strip()
  274. assert in_option, 'missing long opt'
  275. elif in_option and line.startswith(' ' * 4):
  276. options[in_option].append(find_rst_ref.sub(r'\1',
  277. line.strip()).replace('`', ''))
  278. return options
  279. def with_pool_option(self, argv):
  280. """Returns tuple of ``(short_opts, long_opts)`` if the command
  281. supports a pool argument, and used to monkey patch eventlet/gevent
  282. environments as early as possible.
  283. E.g::
  284. has_pool_option = (['-P'], ['--pool'])
  285. """
  286. pass
  287. def _get_default_app(self, *args, **kwargs):
  288. from celery._state import get_current_app
  289. return get_current_app() # omit proxy
  290. def daemon_options(default_pidfile=None, default_logfile=None):
  291. return (
  292. Option('-f', '--logfile', default=default_logfile),
  293. Option('--pidfile', default=default_pidfile),
  294. Option('--uid', default=None),
  295. Option('--gid', default=None),
  296. Option('--umask', default=0, type='int'),
  297. Option('--workdir', default=None, dest='working_directory'),
  298. )