123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- # -*- coding: utf-8 -*-
- from __future__ import absolute_import
- import os
- import sys
- import warnings
- from optparse import OptionParser, make_option as Option
- from .. import __version__, Celery
- from ..exceptions import CDeprecationWarning, CPendingDeprecationWarning
- # always enable DeprecationWarnings, so our users can see them.
- for warning in (CDeprecationWarning, CPendingDeprecationWarning):
- warnings.simplefilter("once", warning, 0)
- class Command(object):
- """Base class for command line applications.
- :keyword app: The current app.
- :keyword get_app: Callable returning the current app if no app provided.
- """
- _default_broker_url = r'amqp://guest:guest@localhost:5672//'
- #: Arg list used in help.
- args = ''
- #: Application version.
- version = __version__
- #: If false the parser will raise an exception if positional
- #: args are provided.
- supports_args = True
- #: List of options (without preload options).
- option_list = ()
- #: List of options to parse before parsing other options.
- preload_options = (
- Option("--app",
- default=None, action="store", dest="app",
- help="Name of the app instance to use. "),
- Option("-b", "--broker",
- default=None, action="store", dest="broker",
- help="Broker URL. Default is %s" % (
- _default_broker_url, )),
- Option("--loader",
- default=None, action="store", dest="loader",
- help="Name of the loader class to use. "
- "Taken from the environment variable CELERY_LOADER, "
- "or 'default' if that is not set."),
- Option("--config",
- default="celeryconfig", action="store",
- dest="config_module",
- help="Name of the module to read configuration from."),
- )
- #: Enable if the application should support config from the cmdline.
- enable_config_from_cmdline = False
- #: Default configuration namespace.
- namespace = "celery"
- Parser = OptionParser
- def __init__(self, app=None, get_app=None):
- self.app = app
- self.get_app = get_app or self._get_default_app
- def run(self, *args, **options):
- """This is the body of the command called by :meth:`handle_argv`."""
- raise NotImplementedError("subclass responsibility")
- def execute_from_commandline(self, argv=None):
- """Execute application from command line.
- :keyword argv: The list of command line arguments.
- Defaults to ``sys.argv``.
- """
- if argv is None:
- argv = list(sys.argv)
- argv = self.setup_app_from_commandline(argv)
- prog_name = os.path.basename(argv[0])
- return self.handle_argv(prog_name, argv[1:])
- def usage(self):
- """Returns the command-line usage string for this app."""
- return "%%prog [options] %s" % (self.args, )
- def get_options(self):
- """Get supported command line options."""
- return self.option_list
- def handle_argv(self, prog_name, argv):
- """Parses command line arguments from ``argv`` and dispatches
- to :meth:`run`.
- :param prog_name: The program name (``argv[0]``).
- :param argv: Command arguments.
- Exits with an error message if :attr:`supports_args` is disabled
- and ``argv`` contains positional arguments.
- """
- options, args = self.parse_options(prog_name, argv)
- for o in vars(options):
- v = getattr(options, o)
- if isinstance(v, basestring):
- setattr(options, o, os.path.expanduser(v))
- argv = map(lambda a: isinstance(a, basestring)
- and os.path.expanduser(a) or a, argv)
- if not self.supports_args and args:
- sys.stderr.write(
- "\nUnrecognized command line arguments: %s\n" % (
- ", ".join(args), ))
- sys.stderr.write("\nTry --help?\n")
- sys.exit(1)
- return self.run(*args, **vars(options))
- def parse_options(self, prog_name, arguments):
- """Parse the available options."""
- # Don't want to load configuration to just print the version,
- # so we handle --version manually here.
- if "--version" in arguments:
- print(self.version)
- sys.exit(0)
- parser = self.create_parser(prog_name)
- options, args = parser.parse_args(arguments)
- return options, args
- def create_parser(self, prog_name):
- return self.Parser(prog=prog_name,
- usage=self.usage(),
- version=self.version,
- option_list=(self.preload_options +
- self.get_options()))
- def prepare_preload_options(self, options):
- """Optional handler to do additional processing of preload options.
- Configuration must not have been initialized
- until after this is called.
- """
- pass
- def setup_app_from_commandline(self, argv):
- preload_options = self.parse_preload_options(argv)
- self.prepare_preload_options(preload_options)
- app = (preload_options.get("app") or
- os.environ.get("CELERY_APP") or
- self.app)
- loader = (preload_options.get("loader") or
- os.environ.get("CELERY_LOADER") or
- "default")
- broker = preload_options.get("broker", None)
- if broker:
- os.environ["CELERY_BROKER_URL"] = broker
- config_module = preload_options.get("config_module")
- if config_module:
- os.environ["CELERY_CONFIG_MODULE"] = config_module
- if app:
- self.app = self.get_cls_by_name(app)
- else:
- self.app = self.get_app(loader=loader)
- if self.enable_config_from_cmdline:
- argv = self.process_cmdline_config(argv)
- return argv
- def get_cls_by_name(self, name):
- from ..utils import get_cls_by_name, import_from_cwd
- return get_cls_by_name(name, imp=import_from_cwd)
- def process_cmdline_config(self, argv):
- try:
- cargs_start = argv.index('--')
- except ValueError:
- return argv
- argv, cargs = argv[:cargs_start], argv[cargs_start + 1:]
- self.app.config_from_cmdline(cargs, namespace=self.namespace)
- return argv
- def parse_preload_options(self, args):
- acc = {}
- opts = {}
- for opt in self.preload_options:
- for t in (opt._long_opts, opt._short_opts):
- opts.update(dict(zip(t, [opt.dest] * len(t))))
- index = 0
- length = len(args)
- while index < length:
- arg = args[index]
- if arg.startswith('--') and '=' in arg:
- key, value = arg.split('=', 1)
- dest = opts.get(key)
- if dest:
- acc[dest] = value
- elif arg.startswith('-'):
- dest = opts.get(arg)
- if dest:
- acc[dest] = args[index + 1]
- index += 1
- index += 1
- return acc
- def _get_default_app(self, *args, **kwargs):
- return Celery(*args, **kwargs)
- def daemon_options(default_pidfile=None, default_logfile=None):
- return (
- Option('-f', '--logfile', default=default_logfile,
- action="store", dest="logfile",
- help="Path to the logfile"),
- Option('--pidfile', default=default_pidfile,
- action="store", dest="pidfile",
- help="Path to the pidfile."),
- Option('--uid', default=None,
- action="store", dest="uid",
- help="Effective user id to run as when detached."),
- Option('--gid', default=None,
- action="store", dest="gid",
- help="Effective group id to run as when detached."),
- Option('--umask', default=0,
- action="store", type="int", dest="umask",
- help="Umask of the process when detached."),
- Option('--workdir', default=None,
- action="store", dest="working_directory",
- help="Directory to change to when detached."),
- )
|