base.py 12 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. .. _preload-options:
  4. Preload Options
  5. ---------------
  6. .. cmdoption:: -A, --app
  7. app instance to use (e.g. module.attr_name)
  8. .. cmdoption:: -b, --broker
  9. url to broker. default is 'amqp://guest@localhost//'
  10. .. cmdoption:: --loader
  11. name of custom loader class to use.
  12. .. cmdoption:: --config
  13. name of the configuration module (default: `celeryconfig`)
  14. .. _daemon-options:
  15. Daemon Options
  16. --------------
  17. .. cmdoption:: -f, --logfile
  18. Path to log file. If no logfile is specified, `stderr` is used.
  19. .. cmdoption:: --pidfile
  20. Optional file used to store the process pid.
  21. The program will not start if this file already exists
  22. and the pid is still alive.
  23. .. cmdoption:: --uid
  24. User id, or user name of the user to run as after detaching.
  25. .. cmdoption:: --gid
  26. Group id, or group name of the main group to change to after
  27. detaching.
  28. .. cmdoption:: --umask
  29. Effective umask of the process after detaching. Default is 0.
  30. .. cmdoption:: --workdir
  31. Optional directory to change to after detaching.
  32. """
  33. from __future__ import absolute_import
  34. import os
  35. import re
  36. import sys
  37. import warnings
  38. from collections import defaultdict
  39. from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
  40. from types import ModuleType
  41. import celery
  42. from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
  43. from celery.platforms import EX_FAILURE, EX_USAGE
  44. from celery.utils import text
  45. from celery.utils.imports import symbol_by_name, import_from_cwd
  46. # always enable DeprecationWarnings, so our users can see them.
  47. for warning in (CDeprecationWarning, CPendingDeprecationWarning):
  48. warnings.simplefilter('once', warning, 0)
  49. ARGV_DISABLED = """
  50. Unrecognized command line arguments: %s
  51. Try --help?
  52. """
  53. find_long_opt = re.compile(r'.+?(--.+?)(?:\s|,|$)')
  54. find_rst_ref = re.compile(r':\w+:`(.+?)`')
  55. class HelpFormatter(IndentedHelpFormatter):
  56. def format_epilog(self, epilog):
  57. if epilog:
  58. return '\n%s\n\n' % epilog
  59. return ''
  60. def format_description(self, description):
  61. return text.ensure_2lines(text.fill_paragraphs(
  62. text.dedent(description), self.width))
  63. class Command(object):
  64. """Base class for command line applications.
  65. :keyword app: The current app.
  66. :keyword get_app: Callable returning the current app if no app provided.
  67. """
  68. Parser = OptionParser
  69. #: Arg list used in help.
  70. args = ''
  71. #: Application version.
  72. version = celery.VERSION_BANNER
  73. #: If false the parser will raise an exception if positional
  74. #: args are provided.
  75. supports_args = True
  76. #: List of options (without preload options).
  77. option_list = ()
  78. # module Rst documentation to parse help from (if any)
  79. doc = None
  80. #: List of options to parse before parsing other options.
  81. preload_options = (
  82. Option('-A', '--app', default=None),
  83. Option('-b', '--broker', default=None),
  84. Option('--loader', default=None),
  85. Option('--config', default='celeryconfig', dest='config_module'),
  86. )
  87. #: Enable if the application should support config from the cmdline.
  88. enable_config_from_cmdline = False
  89. #: Default configuration namespace.
  90. namespace = 'celery'
  91. #: Text to print at end of --help
  92. epilog = None
  93. #: Text to print in --help before option list.
  94. description = ''
  95. def __init__(self, app=None, get_app=None):
  96. self.app = app
  97. self.get_app = get_app or self._get_default_app
  98. def run(self, *args, **options):
  99. """This is the body of the command called by :meth:`handle_argv`."""
  100. raise NotImplementedError('subclass responsibility')
  101. def execute_from_commandline(self, argv=None):
  102. """Execute application from command line.
  103. :keyword argv: The list of command line arguments.
  104. Defaults to ``sys.argv``.
  105. """
  106. if argv is None:
  107. argv = list(sys.argv)
  108. # Should we load any special concurrency environment?
  109. pool_option = self.with_pool_option(argv)
  110. if pool_option:
  111. self.maybe_patch_concurrency(argv, *pool_option)
  112. # Dump version and exit if '--version' arg set.
  113. self.early_version(argv)
  114. argv = self.setup_app_from_commandline(argv)
  115. prog_name = os.path.basename(argv[0])
  116. return self.handle_argv(prog_name, argv[1:])
  117. def _find_option_with_arg(self, argv, short_opts=None, long_opts=None):
  118. for i, arg in enumerate(argv):
  119. if arg.startswith('-'):
  120. if long_opts and arg.startswith('--'):
  121. name, _, val = arg.partition('=')
  122. if name in long_opts:
  123. return val
  124. if short_opts and arg in short_opts:
  125. return argv[i + 1]
  126. raise KeyError('|'.join(short_opts or [] + long_opts or []))
  127. def maybe_patch_concurrency(self, argv, short_opts=None, long_opts=None):
  128. try:
  129. pool = self._find_option_with_arg(argv, short_opts, long_opts)
  130. except KeyError:
  131. pass
  132. else:
  133. from celery import concurrency
  134. # set up eventlet/gevent environments ASAP.
  135. concurrency.get_implementation(pool)
  136. def usage(self, command):
  137. """Returns the command-line usage string for this app."""
  138. return '%%prog [options] %s' % (self.args, )
  139. def get_options(self):
  140. """Get supported command line options."""
  141. return self.option_list
  142. def expanduser(self, value):
  143. if isinstance(value, basestring):
  144. return os.path.expanduser(value)
  145. return value
  146. def handle_argv(self, prog_name, argv):
  147. """Parses command line arguments from ``argv`` and dispatches
  148. to :meth:`run`.
  149. :param prog_name: The program name (``argv[0]``).
  150. :param argv: Command arguments.
  151. Exits with an error message if :attr:`supports_args` is disabled
  152. and ``argv`` contains positional arguments.
  153. """
  154. options, args = self.prepare_args(*self.parse_options(prog_name, argv))
  155. return self.run(*args, **options)
  156. def prepare_args(self, options, args):
  157. if options:
  158. options = dict((k, self.expanduser(v))
  159. for k, v in vars(options).iteritems()
  160. if not k.startswith('_'))
  161. args = map(self.expanduser, args)
  162. self.check_args(args)
  163. return options, args
  164. def check_args(self, args):
  165. if not self.supports_args and args:
  166. self.die(ARGV_DISABLED % (', '.join(args, )), EX_USAGE)
  167. def die(self, msg, status=EX_FAILURE):
  168. sys.stderr.write(msg + '\n')
  169. sys.exit(status)
  170. def early_version(self, argv):
  171. if '--version' in argv:
  172. sys.stdout.write('%s\n' % self.version)
  173. sys.exit(0)
  174. def parse_options(self, prog_name, arguments):
  175. """Parse the available options."""
  176. # Don't want to load configuration to just print the version,
  177. # so we handle --version manually here.
  178. parser = self.create_parser(prog_name)
  179. return parser.parse_args(arguments)
  180. def create_parser(self, prog_name, command=None):
  181. return self.prepare_parser(self.Parser(prog=prog_name,
  182. usage=self.usage(command),
  183. version=self.version,
  184. epilog=self.epilog,
  185. formatter=HelpFormatter(),
  186. description=self.description,
  187. option_list=(self.preload_options +
  188. self.get_options())))
  189. def prepare_parser(self, parser):
  190. docs = [self.parse_doc(doc) for doc in (self.doc, __doc__) if doc]
  191. for doc in docs:
  192. for long_opt, help in doc.iteritems():
  193. option = parser.get_option(long_opt)
  194. if option is not None:
  195. option.help = ' '.join(help) % {'default': option.default}
  196. return parser
  197. def prepare_preload_options(self, options):
  198. """Optional handler to do additional processing of preload options.
  199. Configuration must not have been initialized
  200. until after this is called.
  201. """
  202. pass
  203. def setup_app_from_commandline(self, argv):
  204. preload_options = self.parse_preload_options(argv)
  205. self.prepare_preload_options(preload_options)
  206. app = (preload_options.get('app') or
  207. os.environ.get('CELERY_APP') or
  208. self.app)
  209. loader = (preload_options.get('loader') or
  210. os.environ.get('CELERY_LOADER') or
  211. 'default')
  212. broker = preload_options.get('broker', None)
  213. if broker:
  214. os.environ['CELERY_BROKER_URL'] = broker
  215. config_module = preload_options.get('config_module')
  216. if config_module:
  217. os.environ['CELERY_CONFIG_MODULE'] = config_module
  218. if app:
  219. self.app = self.find_app(app)
  220. else:
  221. self.app = self.get_app(loader=loader)
  222. if self.enable_config_from_cmdline:
  223. argv = self.process_cmdline_config(argv)
  224. return argv
  225. def find_app(self, app):
  226. sym = self.symbol_by_name(app)
  227. if isinstance(sym, ModuleType):
  228. if getattr(sym, '__path__', None):
  229. return self.find_app('%s.celery:' % (app.replace(':', ''), ))
  230. return sym.celery
  231. return sym
  232. def symbol_by_name(self, name):
  233. return symbol_by_name(name, imp=import_from_cwd)
  234. get_cls_by_name = symbol_by_name # XXX compat
  235. def process_cmdline_config(self, argv):
  236. try:
  237. cargs_start = argv.index('--')
  238. except ValueError:
  239. return argv
  240. argv, cargs = argv[:cargs_start], argv[cargs_start + 1:]
  241. self.app.config_from_cmdline(cargs, namespace=self.namespace)
  242. return argv
  243. def parse_preload_options(self, args):
  244. acc = {}
  245. opts = {}
  246. for opt in self.preload_options:
  247. for t in (opt._long_opts, opt._short_opts):
  248. opts.update(dict(zip(t, [opt.dest] * len(t))))
  249. index = 0
  250. length = len(args)
  251. while index < length:
  252. arg = args[index]
  253. if arg.startswith('--') and '=' in arg:
  254. key, value = arg.split('=', 1)
  255. dest = opts.get(key)
  256. if dest:
  257. acc[dest] = value
  258. elif arg.startswith('-'):
  259. dest = opts.get(arg)
  260. if dest:
  261. acc[dest] = args[index + 1]
  262. index += 1
  263. index += 1
  264. return acc
  265. def parse_doc(self, doc):
  266. options, in_option = defaultdict(list), None
  267. for line in doc.splitlines():
  268. if line.startswith('.. cmdoption::'):
  269. m = find_long_opt.match(line)
  270. if m:
  271. in_option = m.groups()[0].strip()
  272. assert in_option, 'missing long opt'
  273. elif in_option and line.startswith(' ' * 4):
  274. options[in_option].append(find_rst_ref.sub(r'\1',
  275. line.strip()).replace('`', ''))
  276. return options
  277. def with_pool_option(self, argv):
  278. """Returns tuple of ``(short_opts, long_opts)`` if the command
  279. supports a pool argument, and used to monkey patch eventlet/gevent
  280. environments as early as possible.
  281. E.g::
  282. has_pool_option = (['-P'], ['--pool'])
  283. """
  284. pass
  285. def _get_default_app(self, *args, **kwargs):
  286. from celery.app import default_app
  287. return default_app._get_current_object() # omit proxy
  288. def daemon_options(default_pidfile=None, default_logfile=None):
  289. return (
  290. Option('-f', '--logfile', default=default_logfile),
  291. Option('--pidfile', default=default_pidfile),
  292. Option('--uid', default=None),
  293. Option('--gid', default=None),
  294. Option('--umask', default=0, type='int'),
  295. Option('--workdir', default=None, dest='working_directory'),
  296. )