Browse Source

Configuration must appear after '--':

    celeryd -l info -- broker.host=foo.com

The configuration keys are case-insensitive and adhers to the following
format: {NAMESPACE}_{KEYNAME}. The keys are automatically converted to
uppercase and .'s are replaced with _'s (e.g. ``broker.host`` becomes
``BROKER_HOST``).
Namespace can be one of: BROKER, CELERY, CELERYD,
                         CELERYBEAT, CELERYMON,
                         EMAIL.

If the key starts with "." or "_" the default namespace is used,
for celeryd this is "CELERYD", for celerybeat it's "CELERYBEAT", ..., and
so on.

    celeryd -l info -- .concurrency=8 \
                       .pool=celery.concurrency.threads.TaskPool \
                       celery_create_missing_queues=false

becomes:

    {"CELERYD_CONCURRENCY": 8,
     "CELERYD_POOL": "celery.concurrency.threads.TaskPool",
     "CELERY_CREATE_MISSING_QUEUES": False}
Ask Solem 14 years ago
parent
commit
9e193f9517

+ 1 - 2
bin/camqadm

@@ -5,5 +5,4 @@ if '' not in sys.path:
 from celery.bin import camqadm
 
 if __name__ == "__main__":
-    options, values = camqadm.parse_options(sys.argv[1:])
-    sys.exit(not camqadm.camqadm(*values, **vars(options)))
+    sys.exit(not camqadm.main())

+ 8 - 2
celery/app/base.py

@@ -6,12 +6,12 @@ from itertools import chain
 
 from celery import routes
 from celery.app.defaults import DEFAULTS
-from celery.datastructures import AttributeDict
+from celery.datastructures import AttributeDictMixin
 from celery.utils import noop, isatty
 from celery.utils.functional import wraps
 
 
-class MultiDictView(AttributeDict):
+class MultiDictView(AttributeDictMixin):
     """View for one more more dicts.
 
     * When getting a key, the dicts are searched in order.
@@ -47,6 +47,9 @@ class MultiDictView(AttributeDict):
             self[key] = default
             return default
 
+    def update(self, *args, **kwargs):
+        return self.__dict__["dicts"][0].update(*args, **kwargs)
+
     def __contains__(self, key):
         for d in self.__dict__["dicts"]:
             if key in d:
@@ -80,6 +83,9 @@ class BaseApp(object):
         self._conf = None
         return self.loader.config_from_envvar(variable_name, silent=silent)
 
+    def config_from_cmdline(self, argv, namespace="celery"):
+        return self.loader.config_from_cmdline(argv, namespace)
+
     def either(self, default_key, *values):
         for value in values:
             if value is not None:

+ 133 - 82
celery/app/defaults.py

@@ -10,87 +10,138 @@ DEFAULT_TASK_LOG_FMT = " ".join("""
 """.strip().split())
 
 
-DEFAULTS = {
-    "BROKER_BACKEND": None,
-    "BROKER_CONNECTION_TIMEOUT": 4,
-    "BROKER_CONNECTION_RETRY": True,
-    "BROKER_CONNECTION_MAX_RETRIES": 100,
-    "BROKER_HOST": "localhost",
-    "BROKER_PORT": None,
-    "BROKER_USER": "guest",
-    "BROKER_PASSWORD": "guest",
-    "BROKER_INSIST": False,
-    "BROKER_USE_SSL": False,
-    "BROKER_VHOST": "/",
-    "CELERY_RESULT_BACKEND": "database",
-    "CELERY_ALWAYS_EAGER": False,
-    "CELERY_EAGER_PROPAGATES_EXCEPTIONS": False,
-    "CELERY_TASK_RESULT_EXPIRES": timedelta(days=1),
-    "CELERY_TASK_ERROR_WHITELIST": (),
-    "CELERY_IMPORTS": (),
-    "CELERY_SEND_EVENTS": False,
-    "CELERY_IGNORE_RESULT": False,
-    "CELERY_STORE_ERRORS_EVEN_IF_IGNORED": False,
-    "CELERY_TASK_SERIALIZER": "pickle",
-    "CELERY_DEFAULT_RATE_LIMIT": None,
-    "CELERY_DISABLE_RATE_LIMITS": False,
-    "CELERYD_TASK_TIME_LIMIT": None,
-    "CELERYD_TASK_SOFT_TIME_LIMIT": None,
-    "CELERYD_MAX_TASKS_PER_CHILD": None,
-    "CELERY_CREATE_MISSING_QUEUES": True,
-    "CELERY_DEFAULT_ROUTING_KEY": "celery",
-    "CELERY_DEFAULT_QUEUE": "celery",
-    "CELERY_DEFAULT_EXCHANGE": "celery",
-    "CELERY_DEFAULT_EXCHANGE_TYPE": "direct",
-    "CELERY_DEFAULT_DELIVERY_MODE": 2, # persistent
-    "CELERY_SEND_TASK_ERROR_EMAILS": False,
-    "CELERY_ACKS_LATE": False,
-    "CELERY_CACHE_BACKEND": None,
-    "CELERY_CACHE_BACKEND_OPTIONS": {},
-    "CELERYD_POOL_PUTLOCKS": True,
-    "CELERYD_POOL": "celery.concurrency.processes.TaskPool",
-    "CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
-    "CELERYD_ETA_SCHEDULER": "celery.utils.timer2.Timer",
-    "CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
-    "CELERYD_CONCURRENCY": 0, # defaults to cpu count
-    "CELERYD_PREFETCH_MULTIPLIER": 4,
-    "CELERYD_LOG_FORMAT": DEFAULT_PROCESS_LOG_FMT,
-    "CELERYD_TASK_LOG_FORMAT": DEFAULT_TASK_LOG_FMT,
-    "CELERYD_LOG_COLOR": None,
-    "CELERYD_LOG_LEVEL": "WARN",
-    "CELERYD_LOG_FILE": None, # stderr
-    "CELERYBEAT_SCHEDULE": {},
-    "CELERYD_STATE_DB": None,
-    "CELERYD_ETA_SCHEDULER_PRECISION": 1,
-    "CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
-    "CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
-    "CELERYBEAT_LOG_LEVEL": "INFO",
-    "CELERYBEAT_LOG_FILE": None, # stderr
-    "CELERYMON_LOG_LEVEL": "INFO",
-    "CELERYMON_LOG_FILE": None, # stderr
-    "CELERYMON_LOG_FORMAT": DEFAULT_LOG_FMT,
-    "CELERY_BROADCAST_QUEUE": "celeryctl",
-    "CELERY_BROADCAST_EXCHANGE": "celeryctl",
-    "CELERY_BROADCAST_EXCHANGE_TYPE": "fanout",
-    "CELERY_EVENT_QUEUE": "celeryevent",
-    "CELERY_EVENT_EXCHANGE": "celeryevent",
-    "CELERY_EVENT_EXCHANGE_TYPE": "direct",
-    "CELERY_EVENT_ROUTING_KEY": "celeryevent",
-    "CELERY_EVENT_SERIALIZER": "json",
-    "CELERY_RESULT_DBURI": None,
-    "CELERY_RESULT_ENGINE_OPTIONS": None,
-    "CELERY_RESULT_EXCHANGE": "celeryresults",
-    "CELERY_RESULT_EXCHANGE_TYPE": "direct",
-    "CELERY_RESULT_SERIALIZER": "pickle",
-    "CELERY_RESULT_PERSISTENT": False,
-    "CELERY_MAX_CACHED_RESULTS": 5000,
-    "CELERY_TRACK_STARTED": False,
+def str_to_bool(s):
+    s = s.lower()
+    if s in ("false", "no", "0"):
+        return False
+    if s in ("true", "yes", "1"):
+        return True
+    raise TypeError("%r can not be converted to type bool" % (s, ))
 
-    # Default e-mail settings.
-    "SERVER_EMAIL": "celery@localhost",
-    "EMAIL_HOST": "localhost",
-    "EMAIL_PORT": 25,
-    "EMAIL_HOST_USER": None,
-    "EMAIL_HOST_PASSWORD": None,
-    "ADMINS": (),
+
+class Option(object):
+    typemap = {"string": str,
+               "int": int,
+               "float": float,
+               "bool": str_to_bool,
+               "dict": dict,
+               "tuple": tuple}
+
+    def __init__(self, default=None, *args, **kwargs):
+        self.default = default
+        kwargs.setdefault("type", "string")
+        self.type = kwargs["type"]
+        self.args = args
+        self.kwargs = kwargs
+
+    def to_python(self, value):
+        return self.typemap[self.type](value)
+
+
+NAMESPACES = {
+    "BROKER": {
+        "HOST": Option("localhost"),
+        "PORT": Option(type="int"),
+        "USER": Option("guest"),
+        "PASSWORD": Option("guest"),
+        "VHOST": Option("/"),
+        "BACKEND": Option(),
+        "CONNECTION_TIMEOUT": Option(4, type="int"),
+        "CONNECTION_RETRY": Option(True, type="bool"),
+        "CONNECTION_MAX_RETRIES": Option(100, type="int"),
+        "INSIST": Option(False, type="bool"),
+        "USE_SSL": Option(False, type="bool"),
+    },
+    "CELERY": {
+        "ACKS_LATE": Option(False, type="bool"),
+        "ALWAYS_EAGER": Option(False, type="bool"),
+        "BROADCAST_QUEUE": Option("celeryctl"),
+        "BROADCAST_EXCHANGE": Option("celeryctl"),
+        "BROADCAST_EXCHANGE_TYPE": Option("fanout"),
+        "CACHE_BACKEND": Option(),
+        "CACHE_BACKEND_OPTIONS": Option({}, type="dict"),
+        "CREATE_MISSING_QUEUES": Option(True, type="bool"),
+        "DEFAULT_RATE_LIMIT": Option(type="string"),
+        "DISABLE_RATE_LIMITS": Option(False, type="bool"),
+        "DEFAULT_ROUTING_KEY": Option("celery"),
+        "DEFAULT_QUEUE": Option("celery"),
+        "DEFAULT_EXCHANGE": Option("celery"),
+        "DEFAULT_EXCHANGE_TYPE": Option("direct"),
+        "DEFAULT_DELIVERY_MODE": Option(2, type="string"),
+        "EAGER_PROPAGATES_EXCEPTIONS": Option(False, type="bool"),
+        "EVENT_QUEUE": Option("celeryevent"),
+        "EVENT_EXCHANGE": Option("celeryevent"),
+        "EVENT_EXCHANGE_TYPE": Option("direct"),
+        "EVENT_ROUTING_KEY": Option("celeryevent"),
+        "EVENT_SERIALIZER": Option("json"),
+        "IMPORTS": Option((), type="tuple"),
+        "IGNORE_RESULT": Option(False, type="bool"),
+        "MAX_CACHED_RESULTS": Option(5000, type="int"),
+        "RESULT_BACKEND": Option("database"),
+        "RESULT_DBURI": Option(),
+        "RESULT_ENGINE_OPTIONS": Option(None, type="dict"),
+        "RESULT_EXCHANGE": Option("celeryresults"),
+        "RESULT_EXCHANGE_TYPE": Option("direct"),
+        "RESULT_SERIALIZER": Option("pickle"),
+        "RESULT_PERSISTENT": Option(False, type="bool"),
+        "SEND_EVENTS": Option(False, type="bool"),
+        "SEND_TASK_ERROR_EMAILS": Option(False, type="bool"),
+        "STORE_ERRORS_EVEN_IF_IGNORED": Option(False, type="bool"),
+        "TASK_RESULT_EXPIRES": Option(timedelta(days=1), type="int"),
+        "TASK_ERROR_WHITELIST": Option((), type="tuple"),
+        "TASK_SERIALIZER": Option("pickle"),
+        "TRACK_STARTED": Option(False, type="bool"),
+    },
+    "CELERYD": {
+        "CONCURRENCY": Option(0, type="int"),
+        "ETA_SCHEDULER": Option("celery.utils.timer2.Timer"),
+        "ETA_SCHEDULER_PRECISION": Option(1.0, type="float"),
+        "LISTENER": Option("celery.worker.listener.CarrotListener"),
+        "LOG_FORMAT": Option(DEFAULT_PROCESS_LOG_FMT),
+        "LOG_COLOR": Option(type="bool"),
+        "LOG_LEVEL": Option("WARN"),
+        "LOG_FILE": Option(),
+        "MEDIATOR": Option("celery.worker.controllers.Mediator"),
+        "MAX_TASKS_PER_CHILD": Option(type="int"),
+        "POOL": Option("celery.concurrency.processes.TaskPool"),
+        "POOL_PUTLOCKS": Option(True, type="bool"),
+        "PREFETCH_MULTIPLIER": Option(4, type="int"),
+        "STATE_DB": Option(),
+        "TASK_LOG_FORMAT": Option(DEFAULT_TASK_LOG_FMT),
+        "TASK_SOFT_TIME_LIMIT": Option(type="int"),
+        "TASK_TIME_LIMIT": Option(type="int"),
+    },
+    "CELERYBEAT": {
+        "SCHEDULE": Option({}, type="dict"),
+        "SCHEDULE_FILENAME": Option("celerybeat-schedule"),
+        "MAX_LOOP_INTERVAL": Option(5 * 60, type="int"),
+        "LOG_LEVEL": Option("INFO"),
+        "LOG_FILE": Option(),
+    },
+    "CELERYMON": {
+        "LOG_LEVEL": Option("INFO"),
+        "LOG_FILE": Option(),
+        "LOG_FORMAT": Option(DEFAULT_LOG_FMT),
+    },
+
+    "EMAIL": {
+        "HOST": Option("localhost"),
+        "PORT": Option(25, type="int"),
+        "HOST_USER": Option(None),
+        "HOST_PASSWORD": Option(None),
+    },
+    "SERVER_EMAIL": Option("celery@localhost"),
+    "ADMINS": Option((), type="tuple"),
 }
+
+
+def _flatten(d, ns=""):
+    acc = []
+    for key, value in d.iteritems():
+        if isinstance(value, dict):
+            acc.extend(_flatten(value, ns=key + '_'))
+        else:
+            acc.append((ns + key, value.default))
+    return acc
+
+DEFAULTS = dict(_flatten(NAMESPACES))

+ 80 - 15
celery/bin/base.py

@@ -3,19 +3,59 @@ import sys
 
 from optparse import OptionParser, make_option as Option
 
+from carrot.utils import partition
+
 from celery import __version__
+from celery import Celery
 from celery.app import app_or_default
+from celery.utils import get_symbol_by_name
 
 
 class Command(object):
     args = ''
     version = __version__
     option_list = ()
+    preload_options = (
+            Option("--app",
+                    default=None, action="store", dest="app",
+                    help="Name of the app instance to use. "),
+            Option("--loader",
+                   default=None, action="store", dest="loader",
+                   help="Name of the loader class to use. "
+                        "Taken from the environment variable CELERY_LOADER, "
+                        "or 'default' if that is not set."),
+            Option("--config",
+                    default="celeryconfig", action="store",
+                    dest="config_module",
+                    help="Name of the module to read configuration from.")
+    )
+    enable_config_from_cmdline = False
+    namespace = "celery"
 
     Parser = OptionParser
 
-    def __init__(self, app=None):
-        self.app = app_or_default(app)
+    def __init__(self, app=None, get_app=None):
+        self.get_app = get_app or self._get_default_app
+
+    def usage(self):
+        return "%%prog [options] %s" % (self.args, )
+
+    def get_options(self):
+        return self.option_list
+
+    def handle_argv(self, prog_name, argv):
+        options, args = self.parse_options(prog_name, argv)
+        return self.run(*args, **vars(options))
+
+    def run(self, *args, **options):
+        raise NotImplementedError("subclass responsibility")
+
+    def execute_from_commandline(self, argv=None):
+        if argv is None:
+            argv = list(sys.argv)
+        argv = self.setup_app_from_commandline(argv)
+        prog_name = os.path.basename(argv[0])
+        return self.handle_argv(prog_name, argv[1:])
 
     def parse_options(self, prog_name, arguments):
         """Parse the available options."""
@@ -27,20 +67,45 @@ class Command(object):
         return self.Parser(prog=prog_name,
                            usage=self.usage(),
                            version=self.version,
-                           option_list=self.get_options())
+                           option_list=(self.preload_options +
+                                        self.get_options()))
 
-    def execute_from_commandline(self, argv=None):
-        if argv is None:
-            argv = list(sys.argv)
-        prog_name = os.path.basename(argv[0])
-        options, args = self.parse_options(prog_name, argv[1:])
-        return self.run(*args, **vars(options))
+    def setup_app_from_commandline(self, argv):
+        preload_options = self.parse_preload_options(argv)
+        app = (preload_options.pop("app", None) or
+               os.environ.get("CELERY_APP"))
+        loader = (preload_options.pop("loader", None) or
+                  os.environ.get("CELERY_LOADER") or
+                  "default")
+        config_module = preload_options.pop("config_module", None)
+        if config_module:
+            os.environ["CELERY_CONFIG_MODULE"] = config_module
+        self.app = (app and get_symbol_by_name(app) or
+                            self.get_app(loader=loader))
+        if self.enable_config_from_cmdline:
+            argv = self.process_cmdline_config(argv)
+        return argv
 
-    def usage(self):
-        return "%%prog [options] %s" % (self.args, )
+    def process_cmdline_config(self, argv):
+        try:
+            cargs_start = argv.index('--')
+        except ValueError:
+            return argv
+        argv, cargs = argv[:cargs_start], argv[cargs_start + 1:]
+        self.app.config_from_cmdline(cargs, namespace=self.namespace)
+        return argv
 
-    def get_options(self):
-        return self.option_list
+    def parse_preload_options(self, args):
+        acc = {}
+        preload_options = dict((opt._long_opts[0], opt.dest)
+                                for opt in self.preload_options)
+        for arg in args:
+            if arg.startswith('--') and '=' in arg:
+                key, _, value = partition(arg, '=')
+                dest = preload_options.get(key)
+                if dest:
+                    acc[dest] = value
+        return acc
 
-    def run(self, *args, **options):
-        raise NotImplementedError("subclass responsibility")
+    def _get_default_app(self, *args, **kwargs):
+        return Celery(*args, **kwargs)

+ 7 - 13
celery/bin/camqadm.py

@@ -14,9 +14,8 @@ from itertools import count
 from amqplib import client_0_8 as amqp
 from carrot.utils import partition
 
-from celery import CompatCelery
 from celery.app import app_or_default
-from celery.utils import info
+from celery.bin.base import Command
 from celery.utils import padlist
 
 # Valid string -> bool coercions.
@@ -28,8 +27,6 @@ BOOLS = {"1": True, "0": False,
 # Map to coerce strings to other types.
 COERCE = {bool: lambda value: BOOLS[value.lower()]}
 
-OPTION_LIST = ()
-
 HELP_HEADER = """
 Commands
 --------
@@ -359,21 +356,18 @@ class AMQPAdmin(object):
             say(m)
 
 
-def parse_options(arguments):
-    """Parse the available options to ``celeryd``."""
-    parser = optparse.OptionParser(option_list=OPTION_LIST)
-    options, values = parser.parse_args(arguments)
-    return options, values
+class AMQPAdminCommand(Command):
 
+    def run(self, *args, **options):
+        options["app"] = self.app
+        return AMQPAdmin(*args, **options).run()
 
 def camqadm(*args, **options):
-    options["app"] = CompatCelery()
-    return AMQPAdmin(*args, **options).run()
+    AMQPAdmin(*args, **options).run()
 
 
 def main():
-    options, values = parse_options(sys.argv[1:])
-    return camqadm(*values, **vars(options))
+    AMQPAdminCommand().execute_from_commandline()
 
 if __name__ == "__main__":
     main()

+ 1 - 3
celery/bin/celerybeat.py

@@ -22,7 +22,6 @@
     ``ERROR``, ``CRITICAL``, or ``FATAL``.
 
 """
-from celery import CompatCelery
 from celery.bin.base import Command, Option
 
 
@@ -62,8 +61,7 @@ class BeatCommand(Command):
 
 
 def main():
-    app = CompatCelery()
-    beat = BeatCommand(app=app)
+    beat = BeatCommand()
     beat.execute_from_commandline()
 
 if __name__ == "__main__":

+ 14 - 31
celery/bin/celeryctl.py

@@ -10,6 +10,7 @@ from anyjson import deserialize
 from celery import __version__
 from celery import CompatCelery
 from celery.app import app_or_default
+from celery.bin.base import Command as CeleryCommand
 from celery.utils import term
 
 
@@ -30,13 +31,9 @@ class Command(object):
     args = ""
     version = __version__
 
-    option_list = (
+    option_list = CeleryCommand.preload_options + (
         Option("--quiet", "-q", action="store_true", dest="quiet",
                 default=False),
-        Option("--conf", dest="conf",
-            help="Celery config module name (default: celeryconfig)"),
-        Option("--loader", dest="loader",
-            help="Celery loaders module name (default: default)"),
         Option("--no-color", "-C", dest="no_color", action="store_true",
             help="Don't colorize output."),
     )
@@ -66,16 +63,12 @@ class Command(object):
                             version=self.version,
                             option_list=self.option_list)
 
-    def run_from_argv(self, argv):
-        self.prog_name = os.path.basename(argv[0])
-        self.command = argv[1]
-        self.arglist = argv[2:]
+    def run_from_argv(self, prog_name, argv):
+        self.prog_name = prog_name
+        self.command = argv[0]
+        self.arglist = argv[1:]
         self.parser = self.create_parser(self.prog_name, self.command)
         options, args = self.parser.parse_args(self.arglist)
-        if options.loader:
-            os.environ["CELERY_LOADER"] = options.loader
-        if options.conf:
-            os.environ["CELERY_CONFIG_MODULE"] = options.conf
         self.colored = term.colored(enabled=not options.no_color)
         self(*args, **options.__dict__)
 
@@ -277,42 +270,32 @@ class help(Command):
 help = command(help)
 
 
-class celeryctl(object):
+class celeryctl(CeleryCommand):
     commands = commands
 
-    def __init__(self, app=None):
-        self.app = app_or_default(app)
-
     def execute(self, command, argv=None):
-        if argv is None:
-            argv = sys.arg
-        argv = list(argv)
         try:
             cls = self.commands[command]
         except KeyError:
-            cls = self.commands["help"]
-            argv.insert(1, "help")
+            cls, argv = self.commands["help"], ["help"]
         cls = self.commands.get(command) or self.commands["help"]
         try:
-            cls(app=self.app).run_from_argv(argv)
+            cls(app=self.app).run_from_argv(self.prog_name, argv)
         except Error:
             return self.execute("help", argv)
 
-    def execute_from_commandline(self, argv=None):
-        if argv is None:
-            argv = sys.argv
+    def handle_argv(self, prog_name, argv):
+        self.prog_name = prog_name
         try:
-            command = argv[1]
+            command = argv[0]
         except IndexError:
-            command = "help"
-            argv.insert(1, "help")
+            command, argv = "help", ["help"]
         return self.execute(command, argv)
 
 
 def main():
     try:
-        app = CompatCelery()
-        celeryctl(app).execute_from_commandline()
+        celeryctl().execute_from_commandline()
     except KeyboardInterrupt:
         pass
 

+ 4 - 4
celery/bin/celeryd.py

@@ -70,16 +70,17 @@
 import multiprocessing
 
 from celery import __version__
-from celery import CompatCelery
 from celery.bin.base import Command, Option
 
 
 class WorkerCommand(Command):
+    namespace = "celeryd"
+    enable_config_from_cmdline = True
 
     def run(self, *args, **kwargs):
         from celery.apps.worker import Worker
         kwargs["app"] = self.app
-        return Worker(*args, **kwargs).run()
+        return Worker(**kwargs).run()
 
     def get_options(self):
         conf = self.app.conf
@@ -151,8 +152,7 @@ class WorkerCommand(Command):
 
 def main():
     multiprocessing.freeze_support()
-    app = CompatCelery()
-    worker = WorkerCommand(app=app)
+    worker = WorkerCommand()
     worker.execute_from_commandline()
 
 if __name__ == "__main__":

+ 51 - 43
celery/bin/celeryev.py

@@ -1,59 +1,67 @@
 import logging
 import sys
 
-from optparse import OptionParser, make_option as Option
-
 from celery import CompatCelery
 from celery.app import app_or_default
 from celery.events.cursesmon import evtop
 from celery.events.dumper import evdump
 from celery.events.snapshot import evcam
 
+from celery.bin.base import Command, Option
+
+
+class EvCommand(Command):
+
+    def run(self, dump=False, camera=None, frequency=1.0, maxrate=None,
+        loglevel="WARNING", logfile=None, **kwargs):
+        if dump:
+            return self.run_evdump()
+        if camera:
+            return self.run_evcam(camera, frequency, maxrate,
+                                  loglevel=loglevel, logfile=logfile)
+        return self.run_evtop()
+
+    def run_evdump(self):
+        from celery.events.dumper import evdump
+        return evdump(app=self.app)
+
+    def run_evtop(self):
+        from celery.events.cursesmon import evtop
+        return evtop(app=self.app)
+
+    def run_evcam(self, *args, **kwargs):
+        from celery.events.snapshot import evcam
+        kwargs["app"] = self.app
+        return evcam(*args, **kwargs)
 
-OPTION_LIST = (
-    Option('-d', '--dump',
-        action="store_true", dest="dump",
-        help="Dump events to stdout."),
-    Option('-c', '--camera',
-        action="store", dest="camera",
-        help="Camera class to take event snapshots with."),
-    Option('-F', '--frequency', '--freq',
-        action="store", dest="frequency", type="float", default=1.0,
-        help="Recording: Snapshot frequency."),
-    Option('-r', '--maxrate',
-        action="store", dest="maxrate", default=None,
-        help="Recording: Shutter rate limit (e.g. 10/m)"),
-    Option('-l', '--loglevel',
-        action="store", dest="loglevel", default="WARNING",
-        help="Loglevel. Default is WARNING."),
-    Option('-f', '--logfile',
-        action="store", dest="logfile", default=None,
-        help="Log file. Default is <stderr>"),
-)
-
-
-def run_celeryev(dump=False, camera=None, frequency=1.0, maxrate=None,
-        loglevel=logging.WARNING, logfile=None, app=None, **kwargs):
-    app = app_or_default(app)
-    if dump:
-        return evdump(app=app)
-    if camera:
-        return evcam(camera, frequency, maxrate, app=app,
-                     loglevel=loglevel, logfile=logfile)
-    return evtop(app=app)
-
-
-def parse_options(arguments):
-    """Parse the available options to ``celeryev``."""
-    parser = OptionParser(option_list=OPTION_LIST)
-    options, values = parser.parse_args(arguments)
-    return options
+    def get_options(self):
+        conf = self.app.conf
+        return (
+            Option('-d', '--dump',
+                   action="store_true", dest="dump",
+                   help="Dump events to stdout."),
+            Option('-c', '--camera',
+                   action="store", dest="camera",
+                   help="Camera class to take event snapshots with."),
+            Option('-F', '--frequency', '--freq',
+                   action="store", dest="frequency",
+                   type="float", default=1.0,
+                   help="Recording: Snapshot frequency."),
+            Option('-r', '--maxrate',
+                   action="store", dest="maxrate", default=None,
+                   help="Recording: Shutter rate limit (e.g. 10/m)"),
+            Option('-l', '--loglevel',
+                   action="store", dest="loglevel", default="WARNING",
+                   help="Loglevel. Default is WARNING."),
+            Option('-f', '--logfile',
+                   action="store", dest="logfile", default=None,
+                   help="Log file. Default is <stderr>"),
+        )
 
 
 def main():
-    options = parse_options(sys.argv[1:])
-    app = CompatCelery()
-    return run_celeryev(app=app, **vars(options))
+    ev = EvCommand()
+    ev.execute_from_commandline()
 
 if __name__ == "__main__":
     main()

+ 6 - 2
celery/datastructures.py

@@ -9,8 +9,7 @@ from Queue import Queue, Empty as QueueEmpty
 from celery.utils.compat import OrderedDict
 
 
-class AttributeDict(dict):
-    """Dict subclass with attribute access."""
+class AttributeDictMixin(object):
 
     def __getattr__(self, key):
         try:
@@ -23,6 +22,11 @@ class AttributeDict(dict):
         self[key] = value
 
 
+class AttributeDict(dict, AttributeDictMixin):
+    """Dict subclass with attribute access."""
+    pass
+
+
 class DictAttribute(object):
 
     def __init__(self, obj):

+ 45 - 0
celery/loaders/base.py

@@ -1,4 +1,5 @@
 import os
+import re
 import sys
 
 from importlib import import_module as _import_module
@@ -59,6 +60,50 @@ class BaseLoader(object):
             self.worker_initialized = True
             self.on_worker_init()
 
+    def config_from_cmdline(self, args, namespace="celery"):
+        for key, value in self.cmdline_config_parser(args, namespace).items():
+            print("KEY %r=%r" % (key, value))
+            self.conf[key] = value
+
+    def cmdline_config_parser(self, args, namespace="celery",
+                re_type=re.compile(r"\((\w+)\)")):
+        from celery.app.defaults import Option, NAMESPACES
+        namespace = namespace.upper()
+
+        def getarg(arg):
+            """Parse a single configuration definition from
+            the command line."""
+
+            ## find key/value
+            # ns.key=value|ns_key=value (case insensitive)
+            key, value = arg.replace('.', '_').split('=', 1)
+            key = key.upper()
+
+            ## find namespace.
+            # .key=value|_key=value expands to default namespace.
+            if key[0] == '_':
+                ns, key = namespace, key[1:]
+            else:
+                # find namespace part of key
+                ns, key = key.split('_', 1)
+
+            ns_key = (ns and ns + "_" or "") + key
+
+            # (type)value makes cast to custom type.
+            cast = re_type.match(value)
+            if cast:
+                value = Option.typemap[cast.groups()[0]](
+                            value[len(cast.group()):])
+            else:
+                try:
+                    value = NAMESPACES[ns][key].to_python(value)
+                except ValueError, exc:
+                    # display key name in error message.
+                    raise ValueError("%r: %s" % (ns_key, exc))
+            return ns_key, value
+
+        return dict(map(getarg, args))
+
     @property
     def conf(self):
         """Loader configuration."""

+ 3 - 3
celery/loaders/default.py

@@ -54,9 +54,9 @@ class Loader(BaseLoader):
         try:
             celeryconfig = self.import_from_cwd(configname)
         except ImportError:
-            warnings.warn("No celeryconfig.py module found! Please make "
-                          "sure it exists and is available to Python.",
-                          NotConfigured)
+            warnings.warn(NotConfigured(
+                "No %r module found! Please make sure it exists and "
+                "is available to Python." % (configname, )))
             return self.setup_settings(DEFAULT_UNCONFIGURED_SETTINGS)
         else:
             usercfg = dict((key, getattr(celeryconfig, key))

+ 2 - 0
celery/utils/__init__.py

@@ -342,6 +342,8 @@ def get_cls_by_name(name, aliases={}):
     module = importlib.import_module(module_name)
     return getattr(module, cls_name)
 
+get_symbol_by_name = get_cls_by_name
+
 
 def instantiate(name, *args, **kwargs):
     """Instantiate class by name.