base.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. # -*- coding: utf-8 -*-
  2. """
  3. .. _preload-options:
  4. Preload Options
  5. ---------------
  6. .. cmdoption:: -A, --app
  7. app instance to use (e.g. module.attr_name)
  8. .. cmdoption:: -b, --broker
  9. url to broker. default is 'amqp://guest@localhost//'
  10. .. cmdoption:: --loader
  11. name of custom loader class to use.
  12. .. cmdoption:: --config
  13. name of the configuration module (default: `celeryconfig`)
  14. .. _daemon-options:
  15. Daemon Options
  16. --------------
  17. .. cmdoption:: -f, --logfile
  18. Path to log file. If no logfile is specified, `stderr` is used.
  19. .. cmdoption:: --pidfile
  20. Optional file used to store the process pid.
  21. The program will not start if this file already exists
  22. and the pid is still alive.
  23. .. cmdoption:: --uid
  24. User id, or user name of the user to run as after detaching.
  25. .. cmdoption:: --gid
  26. Group id, or group name of the main group to change to after
  27. detaching.
  28. .. cmdoption:: --umask
  29. Effective umask of the process after detaching. Default is 0.
  30. .. cmdoption:: --workdir
  31. Optional directory to change to after detaching.
  32. """
  33. from __future__ import absolute_import, print_function
  34. import os
  35. import re
  36. import socket
  37. import sys
  38. import warnings
  39. from collections import defaultdict
  40. from itertools import izip
  41. from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
  42. from types import ModuleType
  43. import celery
  44. from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
  45. from celery.platforms import EX_FAILURE, EX_USAGE, maybe_patch_concurrency
  46. from celery.utils import text
  47. from celery.utils.imports import symbol_by_name, import_from_cwd
  48. # always enable DeprecationWarnings, so our users can see them.
  49. for warning in (CDeprecationWarning, CPendingDeprecationWarning):
  50. warnings.simplefilter('once', warning, 0)
  51. ARGV_DISABLED = """
  52. Unrecognized command line arguments: {0}
  53. Try --help?
  54. """
  55. find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
  56. find_rst_ref = re.compile(r':\w+:`(.+?)`')
  57. find_sformat = re.compile(r'%(\w)')
  58. class HelpFormatter(IndentedHelpFormatter):
  59. def format_epilog(self, epilog):
  60. if epilog:
  61. return '\n{0}\n\n'.format(epilog)
  62. return ''
  63. def format_description(self, description):
  64. return text.ensure_2lines(text.fill_paragraphs(
  65. text.dedent(description), self.width))
  66. class Command(object):
  67. """Base class for command line applications.
  68. :keyword app: The current app.
  69. :keyword get_app: Callable returning the current app if no app provided.
  70. """
  71. Parser = OptionParser
  72. #: Arg list used in help.
  73. args = ''
  74. #: Application version.
  75. version = celery.VERSION_BANNER
  76. #: If false the parser will raise an exception if positional
  77. #: args are provided.
  78. supports_args = True
  79. #: List of options (without preload options).
  80. option_list = ()
  81. # module Rst documentation to parse help from (if any)
  82. doc = None
  83. # Some programs (multi) does not want to load the app specified
  84. # (Issue #1008).
  85. respects_app_option = True
  86. #: List of options to parse before parsing other options.
  87. preload_options = (
  88. Option('-A', '--app', default=None),
  89. Option('-b', '--broker', default=None),
  90. Option('--loader', default=None),
  91. Option('--config', default='celeryconfig', dest='config_module'),
  92. )
  93. #: Enable if the application should support config from the cmdline.
  94. enable_config_from_cmdline = False
  95. #: Default configuration namespace.
  96. namespace = 'celery'
  97. #: Text to print at end of --help
  98. epilog = None
  99. #: Text to print in --help before option list.
  100. description = ''
  101. #: Set to true if this command doesn't have subcommands
  102. leaf = True
  103. def __init__(self, app=None, get_app=None):
  104. self.app = app
  105. self.get_app = get_app or self._get_default_app
  106. def run(self, *args, **options):
  107. """This is the body of the command called by :meth:`handle_argv`."""
  108. raise NotImplementedError('subclass responsibility')
  109. def execute_from_commandline(self, argv=None):
  110. """Execute application from command line.
  111. :keyword argv: The list of command line arguments.
  112. Defaults to ``sys.argv``.
  113. """
  114. if argv is None:
  115. argv = list(sys.argv)
  116. # Should we load any special concurrency environment?
  117. self.maybe_patch_concurrency(argv)
  118. self.on_concurrency_setup()
  119. # Dump version and exit if '--version' arg set.
  120. self.early_version(argv)
  121. argv = self.setup_app_from_commandline(argv)
  122. prog_name = os.path.basename(argv[0])
  123. return self.handle_argv(prog_name, argv[1:])
  124. def run_from_argv(self, prog_name, argv=None):
  125. return self.handle_argv(prog_name, sys.argv if argv is None else argv)
  126. def maybe_patch_concurrency(self, argv=None):
  127. argv = argv or sys.argv
  128. pool_option = self.with_pool_option(argv)
  129. if pool_option:
  130. maybe_patch_concurrency(argv, *pool_option)
  131. short_opts, long_opts = pool_option
  132. def on_concurrency_setup(self):
  133. pass
  134. def usage(self, command):
  135. """Returns the command-line usage string for this app."""
  136. return '%%prog [options] {0.args}'.format(self)
  137. def get_options(self):
  138. """Get supported command line options."""
  139. return self.option_list
  140. def expanduser(self, value):
  141. if isinstance(value, basestring):
  142. return os.path.expanduser(value)
  143. return value
  144. def handle_argv(self, prog_name, argv):
  145. """Parses command line arguments from ``argv`` and dispatches
  146. to :meth:`run`.
  147. :param prog_name: The program name (``argv[0]``).
  148. :param argv: Command arguments.
  149. Exits with an error message if :attr:`supports_args` is disabled
  150. and ``argv`` contains positional arguments.
  151. """
  152. options, args = self.prepare_args(*self.parse_options(prog_name, argv))
  153. return self.run(*args, **options)
  154. def prepare_args(self, options, args):
  155. if options:
  156. options = dict((k, self.expanduser(v))
  157. for k, v in vars(options).iteritems()
  158. if not k.startswith('_'))
  159. args = [self.expanduser(arg) for arg in args]
  160. self.check_args(args)
  161. return options, args
  162. def check_args(self, args):
  163. if not self.supports_args and args:
  164. self.die(ARGV_DISABLED.format(', '.join(args)), EX_USAGE)
  165. def die(self, msg, status=EX_FAILURE):
  166. print(msg, file=sys.stderr)
  167. sys.exit(status)
  168. def early_version(self, argv):
  169. if '--version' in argv:
  170. print(self.version)
  171. sys.exit(0)
  172. def parse_options(self, prog_name, arguments):
  173. """Parse the available options."""
  174. # Don't want to load configuration to just print the version,
  175. # so we handle --version manually here.
  176. parser = self.create_parser(prog_name)
  177. return parser.parse_args(arguments)
  178. def create_parser(self, prog_name, command=None):
  179. return self.prepare_parser(self.Parser(prog=prog_name,
  180. usage=self.usage(command),
  181. version=self.version,
  182. epilog=self.epilog,
  183. formatter=HelpFormatter(),
  184. description=self.description,
  185. option_list=(self.preload_options +
  186. self.get_options())))
  187. def prepare_parser(self, parser):
  188. docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]
  189. for doc in docs:
  190. for long_opt, help in doc.iteritems():
  191. option = parser.get_option(long_opt)
  192. if option is not None:
  193. option.help = ' '.join(help).format(default=option.default)
  194. return parser
  195. def prepare_preload_options(self, options):
  196. """Optional handler to do additional processing of preload options.
  197. Configuration must not have been initialized
  198. until after this is called.
  199. """
  200. pass
  201. def setup_app_from_commandline(self, argv):
  202. preload_options = self.parse_preload_options(argv)
  203. self.prepare_preload_options(preload_options)
  204. app = (preload_options.get('app') or
  205. os.environ.get('CELERY_APP') or
  206. self.app)
  207. loader = (preload_options.get('loader') or
  208. os.environ.get('CELERY_LOADER') or
  209. 'default')
  210. broker = preload_options.get('broker', None)
  211. if broker:
  212. os.environ['CELERY_BROKER_URL'] = broker
  213. config_module = preload_options.get('config_module')
  214. if config_module:
  215. os.environ['CELERY_CONFIG_MODULE'] = config_module
  216. if self.respects_app_option:
  217. if app and self.respects_app_option:
  218. self.app = self.find_app(app)
  219. elif self.app is None:
  220. self.app = self.get_app(loader=loader)
  221. if self.enable_config_from_cmdline:
  222. argv = self.process_cmdline_config(argv)
  223. else:
  224. self.app = celery.Celery()
  225. return argv
  226. def find_app(self, app):
  227. try:
  228. sym = self.symbol_by_name(app)
  229. except AttributeError:
  230. # last part was not an attribute, but a module
  231. sym = import_from_cwd(app)
  232. if isinstance(sym, ModuleType):
  233. if getattr(sym, '__path__', None):
  234. return self.find_app('{0}.celery:'.format(
  235. app.replace(':', '')))
  236. return sym.celery
  237. return sym
  238. def symbol_by_name(self, name):
  239. return symbol_by_name(name, imp=import_from_cwd)
  240. get_cls_by_name = symbol_by_name # XXX compat
  241. def process_cmdline_config(self, argv):
  242. try:
  243. cargs_start = argv.index('--')
  244. except ValueError:
  245. return argv
  246. argv, cargs = argv[:cargs_start], argv[cargs_start + 1:]
  247. self.app.config_from_cmdline(cargs, namespace=self.namespace)
  248. return argv
  249. def parse_preload_options(self, args):
  250. acc = {}
  251. opts = {}
  252. for opt in self.preload_options:
  253. for t in (opt._long_opts, opt._short_opts):
  254. opts.update(dict(izip(t, [opt.dest] * len(t))))
  255. index = 0
  256. length = len(args)
  257. while index < length:
  258. arg = args[index]
  259. if arg.startswith('--') and '=' in arg:
  260. key, value = arg.split('=', 1)
  261. dest = opts.get(key)
  262. if dest:
  263. acc[dest] = value
  264. elif arg.startswith('-'):
  265. dest = opts.get(arg)
  266. if dest:
  267. acc[dest] = args[index + 1]
  268. index += 1
  269. index += 1
  270. return acc
  271. def parse_doc(self, doc):
  272. options, in_option = defaultdict(list), None
  273. for line in doc.splitlines():
  274. if line.startswith('.. cmdoption::'):
  275. m = find_long_opt.match(line)
  276. if m:
  277. in_option = m.groups()[0].strip()
  278. assert in_option, 'missing long opt'
  279. elif in_option and line.startswith(' ' * 4):
  280. options[in_option].append(find_rst_ref.sub(r'\1',
  281. line.strip()).replace('`', ''))
  282. return options
  283. def with_pool_option(self, argv):
  284. """Returns tuple of ``(short_opts, long_opts)`` if the command
  285. supports a pool argument, and used to monkey patch eventlet/gevent
  286. environments as early as possible.
  287. E.g::
  288. has_pool_option = (['-P'], ['--pool'])
  289. """
  290. pass
  291. def simple_format(self, s, match=find_sformat, expand=r'\1', **keys):
  292. if s:
  293. host = socket.gethostname()
  294. name, _, domain = host.partition('.')
  295. keys = dict({'%': '%', 'h': host, 'n': name, 'd': domain}, **keys)
  296. return match.sub(lambda m: keys[m.expand(expand)], s)
  297. def _get_default_app(self, *args, **kwargs):
  298. from celery._state import get_current_app
  299. return get_current_app() # omit proxy
  300. def daemon_options(default_pidfile=None, default_logfile=None):
  301. return (
  302. Option('-f', '--logfile', default=default_logfile),
  303. Option('--pidfile', default=default_pidfile),
  304. Option('--uid', default=None),
  305. Option('--gid', default=None),
  306. Option('--umask', default=0, type='int'),
  307. Option('--workdir', default=None, dest='working_directory'),
  308. )