Przeglądaj źródła

Refactors celery.bin

Ask Solem 12 lat temu
rodzic
commit
0399fdf213
7 zmienionych plików z 467 dodań i 489 usunięć
  1. 19 6
      celery/bin/amqp.py
  2. 146 17
      celery/bin/base.py
  3. 11 3
      celery/bin/beat.py
  4. 64 455
      celery/bin/celery.py
  5. 20 2
      celery/bin/events.py
  6. 189 0
      celery/bin/graph.py
  7. 18 6
      celery/bin/worker.py

+ 19 - 6
celery/bin/amqp.py

@@ -364,19 +364,32 @@ class AMQPAdmin(object):
             say(m, file=self.out)
 
 
-class AMQPAdminCommand(Command):
+class amqp(Command):
+    """AMQP Administration Shell.
+
+    Also works for non-amqp transports (but not ones that
+    store declarations in memory).
+
+    Examples::
+
+        celery amqp
+            start shell mode
+        celery amqp help
+            show list of commands
+
+        celery amqp exchange.delete name
+        celery amqp queue.delete queue
+        celery amqp queue.delete queue yes yes
+
+    """
 
     def run(self, *args, **options):
         options['app'] = self.app
         return AMQPAdmin(*args, **options).run()
 
 
-def run(*args, **options):
-    AMQPAdmin(*args, **options).run()
-
-
 def main():
-    AMQPAdminCommand().execute_from_commandline()
+    amqp().execute_from_commandline()
 
 if __name__ == '__main__':  # pragma: no cover
     main()

+ 146 - 17
celery/bin/base.py

@@ -65,13 +65,19 @@ import sys
 import warnings
 
 from collections import defaultdict
+from heapq import heappush
 from optparse import OptionParser, IndentedHelpFormatter, make_option as Option
+from pprint import pformat
 from types import ModuleType
 
 import celery
 from celery.exceptions import CDeprecationWarning, CPendingDeprecationWarning
-from celery.five import items, string_t
-from celery.platforms import EX_FAILURE, EX_USAGE, maybe_patch_concurrency
+from celery.five import items, string, string_t
+from celery.platforms import (
+    EX_FAILURE, EX_OK, EX_USAGE,
+    maybe_patch_concurrency,
+)
+from celery.utils import term
 from celery.utils import text
 from celery.utils.imports import symbol_by_name, import_from_cwd
 
@@ -90,6 +96,46 @@ find_rst_ref = re.compile(r':\w+:`(.+?)`')
 find_sformat = re.compile(r'%(\w)')
 
 
+class Error(Exception):
+
+    def __init__(self, reason, status=EX_FAILURE):
+        self.reason = reason
+        self.status = status
+        super(Error, self).__init__(reason, status)
+
+    def __str__(self):
+        return self.reason
+
+
+class Extensions(object):
+
+    def __init__(self, namespace, register):
+        self.names = []
+        self.namespace = namespace
+        self.register = register
+
+    def add(self, cls, name):
+        heappush(self.names, name)
+        self.register(cls, name=name)
+
+    def load(self):
+        try:
+            from pkg_resources import iter_entry_points
+        except ImportError:
+            return
+
+        for ep in iter_entry_points(self.namespace):
+            sym = ':'.join([ep.module_name, ep.attrs[0]])
+            try:
+                cls = symbol_by_name(sym)
+            except (ImportError, SyntaxError) as exc:
+                warnings.warn(
+                    'Cannot load extension {0!r}: {1!r}'.format(sym, exc))
+            else:
+                self.add(cls, ep.name)
+        return self.names
+
+
 class HelpFormatter(IndentedHelpFormatter):
 
     def format_epilog(self, epilog):
@@ -138,6 +184,8 @@ class Command(object):
         Option('--loader', default=None),
         Option('--config', default=None),
         Option('--workdir', default=None, dest='working_directory'),
+        Option('--no-color', '-C', action='store_true', default=None),
+        Option('--quiet', '-q', action='store_true'),
     )
 
     #: Enable if the application should support config from the cmdline.
@@ -155,9 +203,33 @@ class Command(object):
     #: Set to true if this command doesn't have subcommands
     leaf = True
 
-    def __init__(self, app=None, get_app=None):
+    # used by :meth:`say_remote_control_reply`.
+    show_body = True
+    # used by :meth:`say_chat`.
+    show_reply = True
+
+    prog_name = 'celery'
+
+    def __init__(self, app=None, get_app=None, no_color=False,
+            stdout=None, stderr=None, quiet=False):
         self.app = app
         self.get_app = get_app or self._get_default_app
+        self.stdout = stdout or sys.stdout
+        self.stderr = stderr or sys.stderr
+        self.no_color = no_color
+        self.colored = term.colored(enabled=not self.no_color)
+        self.quiet = quiet
+        if not self.description:
+            self.description = self.__doc__
+
+    def __call__(self, *args, **kwargs):
+        try:
+            ret = self.run(*args, **kwargs)
+        except Error as exc:
+            self.error(self.colored.red('Error: {0}'.format(exc)))
+            return exc.status
+
+        return ret if ret is not None else EX_OK
 
     def run(self, *args, **options):
         """This is the body of the command called by :meth:`handle_argv`."""
@@ -179,11 +251,12 @@ class Command(object):
         # Dump version and exit if '--version' arg set.
         self.early_version(argv)
         argv = self.setup_app_from_commandline(argv)
-        prog_name = os.path.basename(argv[0])
-        return self.handle_argv(prog_name, argv[1:])
+        self.prog_name = os.path.basename(argv[0])
+        return self.handle_argv(self.prog_name, argv[1:])
 
-    def run_from_argv(self, prog_name, argv=None):
-        return self.handle_argv(prog_name, sys.argv if argv is None else argv)
+    def run_from_argv(self, prog_name, argv=None, command=None):
+        return self.handle_argv(prog_name,
+                                sys.argv if argv is None else argv, command)
 
     def maybe_patch_concurrency(self, argv=None):
         argv = argv or sys.argv
@@ -196,8 +269,7 @@ class Command(object):
         pass
 
     def usage(self, command):
-        """Returns the command-line usage string for this app."""
-        return '%%prog [options] {0.args}'.format(self)
+        return '%prog {0} [options] {self.args}'.format(command, self=self)
 
     def get_options(self):
         """Get supported command-line options."""
@@ -208,7 +280,7 @@ class Command(object):
             return os.path.expanduser(value)
         return value
 
-    def handle_argv(self, prog_name, argv):
+    def handle_argv(self, prog_name, argv, command=None):
         """Parses command-line arguments from ``argv`` and dispatches
         to :meth:`run`.
 
@@ -219,8 +291,9 @@ class Command(object):
         and ``argv`` contains positional arguments.
 
         """
-        options, args = self.prepare_args(*self.parse_options(prog_name, argv))
-        return self.run(*args, **options)
+        options, args = self.prepare_args(
+            *self.parse_options(prog_name, argv, command))
+        return self(*args, **options)
 
     def prepare_args(self, options, args):
         if options:
@@ -235,21 +308,27 @@ class Command(object):
         if not self.supports_args and args:
             self.die(ARGV_DISABLED.format(', '.join(args)), EX_USAGE)
 
+    def error(self, s):
+        self.out(s, fh=self.stderr)
+
+    def out(self, s, fh=None):
+        print(s, file=fh or self.stdout)
+
     def die(self, msg, status=EX_FAILURE):
-        print(msg, file=sys.stderr)
+        self.error(msg)
         sys.exit(status)
 
     def early_version(self, argv):
         if '--version' in argv:
-            print(self.version)
+            print(self.version, file=self.stdout)
             sys.exit(0)
 
-    def parse_options(self, prog_name, arguments):
+    def parse_options(self, prog_name, arguments, command=None):
         """Parse the available options."""
         # Don't want to load configuration to just print the version,
         # so we handle --version manually here.
-        parser = self.create_parser(prog_name)
-        return parser.parse_args(arguments)
+        self.parser = self.create_parser(prog_name, command)
+        return self.parser.parse_args(arguments)
 
     def create_parser(self, prog_name, command=None):
         return self.prepare_parser(self.Parser(prog=prog_name,
@@ -272,6 +351,11 @@ class Command(object):
 
     def setup_app_from_commandline(self, argv):
         preload_options = self.parse_preload_options(argv)
+        quiet = preload_options.get('quiet')
+        if quiet is not None:
+            self.quiet = quiet
+        self.colored.enabled = \
+            not preload_options.get('no_color', self.no_color)
         workdir = preload_options.get('working_directory')
         if workdir:
             os.chdir(workdir)
@@ -388,6 +472,51 @@ class Command(object):
         from celery._state import get_current_app
         return get_current_app()  # omit proxy
 
+    def pretty_list(self, n):
+        c = self.colored
+        if not n:
+            return '- empty -'
+        return '\n'.join(str(c.reset(c.white('*'), ' {0}'.format(item)))
+                            for item in n)
+
+    def pretty_dict_ok_error(self, n):
+        c = self.colored
+        try:
+            return (c.green('OK'),
+                    text.indent(self.pretty(n['ok'])[1], 4))
+        except KeyError:
+            pass
+        return (c.red('ERROR'),
+                text.indent(self.pretty(n['error'])[1], 4))
+
+    def say_remote_command_reply(self, replies):
+        c = self.colored
+        node = next(iter(replies))  # <-- take first.
+        reply = replies[node]
+        status, preply = self.pretty(reply)
+        self.say_chat('->', c.cyan(node, ': ') + status,
+                      text.indent(preply, 4) if self.show_reply else '')
+
+    def pretty(self, n):
+        OK = str(self.colored.green('OK'))
+        if isinstance(n, list):
+            return OK, self.pretty_list(n)
+        if isinstance(n, dict):
+            if 'ok' in n or 'error' in n:
+                return self.pretty_dict_ok_error(n)
+        if isinstance(n, string_t):
+            return OK, string(n)
+        return OK, pformat(n)
+
+    def say_chat(self, direction, title, body=''):
+        c = self.colored
+        if direction == '<-' and self.quiet:
+            return
+        dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
+        self.out(c.reset(dirstr, title))
+        if body and self.show_body:
+            self.out(body)
+
 
 def daemon_options(default_pidfile=None, default_logfile=None):
     return (

+ 11 - 3
celery/bin/beat.py

@@ -47,7 +47,16 @@ from celery.platforms import detached
 from celery.bin.base import Command, Option, daemon_options
 
 
-class BeatCommand(Command):
+class beat(Command):
+    """Start the beat periodic task scheduler.
+
+    Examples::
+
+        celery beat -l info
+        celery beat -s /var/run/celery/beat-schedule --detach
+        celery beat -S djcelery.schedulers.DatabaseScheduler
+
+    """
     doc = __doc__
     enable_config_from_cmdline = True
     supports_args = False
@@ -81,8 +90,7 @@ class BeatCommand(Command):
 
 
 def main():
-    beat = BeatCommand()
-    beat.execute_from_commandline()
+    beat().execute_from_commandline()
 
 if __name__ == '__main__':      # pragma: no cover
     main()

+ 64 - 455
celery/bin/celery.py

@@ -6,29 +6,28 @@ The :program:`celery` umbrella command.
 .. program:: celery
 
 """
-from __future__ import absolute_import, print_function, unicode_literals
+from __future__ import absolute_import, unicode_literals
 
 import anyjson
-import heapq
 import os
 import sys
-import warnings
 
 from importlib import import_module
-from operator import itemgetter
-from pprint import pformat
 
-from celery.datastructures import DependencyGraph, GraphFormatter
-from celery.five import items, string, string_t, values
+from celery.five import string_t, values
 from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
 from celery.utils import term
 from celery.utils import text
-from celery.utils.functional import memoize
-from celery.utils.imports import symbol_by_name
 from celery.utils.timeutils import maybe_iso8601
 
-from celery.bin.base import Command as BaseCommand, Option
+from .base import Command, Error, Option, Extensions
 
+# Import commands from other modules
+from .amqp import amqp
+from .beat import beat
+from .events import events
+from .graph import graph
+from .worker import worker
 
 HELP = """
 ---- -- - - ---- Commands- -------------- --- ------------
@@ -46,7 +45,6 @@ Migrating task {state.count}/{state.strtotal}: \
 
 DEBUG = os.environ.get('C_DEBUG', False)
 
-commands = {}
 command_classes = [
     ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
     ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
@@ -58,51 +56,6 @@ if DEBUG:
     )
 
 
-@memoize()
-def _get_extension_classes():
-    extensions = []
-    command_classes.append(('Extensions', extensions, 'magenta'))
-    return extensions
-
-
-class Error(Exception):
-
-    def __init__(self, reason, status=EX_FAILURE):
-        self.reason = reason
-        self.status = status
-        super(Error, self).__init__(reason, status)
-
-    def __str__(self):
-        return self.reason
-
-
-def command(*args, **kwargs):
-
-    def _register(fun):
-        commands[kwargs.get('name') or fun.__name__] = fun
-        return fun
-
-    return _register(args[0]) if args else _register
-
-
-def load_extension_commands(namespace='celery.commands'):
-    try:
-        from pkg_resources import iter_entry_points
-    except ImportError:
-        return
-
-    for ep in iter_entry_points(namespace):
-        sym = ':'.join([ep.module_name, ep.attrs[0]])
-        try:
-            cls = symbol_by_name(sym)
-        except (ImportError, SyntaxError) as exc:
-            warnings.warn(
-                'Cannot load extension {0!r}: {1!r}'.format(sym, exc))
-        else:
-            heapq.heappush(_get_extension_classes(), ep.name)
-            command(cls, name=ep.name)
-
-
 def determine_exit_status(ret):
     if isinstance(ret, int):
         return ret
@@ -125,134 +78,6 @@ def main(argv=None):
         pass
 
 
-class Command(BaseCommand):
-    help = ''
-    args = ''
-    prog_name = 'celery'
-    show_body = True
-    show_reply = True
-
-    option_list = (
-        Option('--quiet', '-q', action='store_true'),
-        Option('--no-color', '-C', action='store_true', default=None),
-    )
-
-    def __init__(self, app=None, no_color=False, stdout=sys.stdout,
-            stderr=sys.stderr, show_reply=True):
-        super(Command, self).__init__(app=app)
-        self.colored = term.colored(enabled=not no_color)
-        self.stdout = stdout
-        self.stderr = stderr
-        self.quiet = False
-        if show_reply is not None:
-            self.show_reply = show_reply
-
-    def __call__(self, *args, **kwargs):
-        try:
-            ret = self.run(*args, **kwargs)
-        except Error as exc:
-            self.error(self.colored.red('Error: {0}'.format(exc)))
-            return exc.status
-
-        return ret if ret is not None else EX_OK
-
-    def exit_help(self, command):
-        # this never exits due to OptionParser.parse_options
-        self.run_from_argv(self.prog_name, [command, '--help'])
-        sys.exit(EX_USAGE)
-
-    def error(self, s):
-        self.out(s, fh=self.stderr)
-
-    def out(self, s, fh=None):
-        print(s, file=fh or self.stdout)
-
-    def run_from_argv(self, prog_name, argv):
-        self.prog_name = prog_name
-        self.command = argv[0]
-        self.arglist = argv[1:]
-        self.parser = self.create_parser(self.prog_name, self.command)
-        options, args = self.prepare_args(
-                *self.parser.parse_args(self.arglist))
-        self.colored = term.colored(enabled=not options['no_color'])
-        self.quiet = options.get('quiet', False)
-        self.show_body = options.get('show_body', True)
-        return self(*args, **options)
-
-    def usage(self, command):
-        return '%prog {0} [options] {self.args}'.format(command, self=self)
-
-    def prettify_list(self, n):
-        c = self.colored
-        if not n:
-            return '- empty -'
-        return '\n'.join(str(c.reset(c.white('*'), ' {0}'.format(item)))
-                            for item in n)
-
-    def prettify_dict_ok_error(self, n):
-        c = self.colored
-        try:
-            return (c.green('OK'),
-                    text.indent(self.prettify(n['ok'])[1], 4))
-        except KeyError:
-            pass
-        return (c.red('ERROR'),
-                text.indent(self.prettify(n['error'])[1], 4))
-
-    def say_remote_command_reply(self, replies):
-        c = self.colored
-        node = next(iter(replies))  # <-- take first.
-        reply = replies[node]
-        status, preply = self.prettify(reply)
-        self.say_chat('->', c.cyan(node, ': ') + status,
-                      text.indent(preply, 4) if self.show_reply else '')
-
-    def prettify(self, n):
-        OK = str(self.colored.green('OK'))
-        if isinstance(n, list):
-            return OK, self.prettify_list(n)
-        if isinstance(n, dict):
-            if 'ok' in n or 'error' in n:
-                return self.prettify_dict_ok_error(n)
-        if isinstance(n, string_t):
-            return OK, string(n)
-        return OK, pformat(n)
-
-    def say_chat(self, direction, title, body=''):
-        c = self.colored
-        if direction == '<-' and self.quiet:
-            return
-        dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
-        self.out(c.reset(dirstr, title))
-        if body and self.show_body:
-            self.out(body)
-
-    @property
-    def description(self):
-        return self.__doc__
-
-
-class Delegate(Command):
-
-    def __init__(self, *args, **kwargs):
-        super(Delegate, self).__init__(*args, **kwargs)
-
-        self.target = symbol_by_name(self.Command)(app=self.app)
-        self.args = self.target.args
-
-    def get_options(self):
-        return self.option_list + self.target.get_options()
-
-    def create_parser(self, prog_name, command):
-        parser = super(Delegate, self).create_parser(prog_name, command)
-        return self.target.prepare_parser(parser)
-
-    def run(self, *args, **kwargs):
-        self.target.check_args(args)
-        return self.target.run(*args, **kwargs)
-
-
-@command
 class multi(Command):
     """Start multiple worker instances."""
     respects_app_option = False
@@ -265,86 +90,6 @@ class multi(Command):
         return MultiTool().execute_from_commandline(argv, prog_name)
 
 
-@command
-class worker(Delegate):
-    """Start worker instance.
-
-    Examples::
-
-        celery worker --app=proj -l info
-        celery worker -A proj -l info -Q hipri,lopri
-
-        celery worker -A proj --concurrency=4
-        celery worker -A proj --concurrency=1000 -P eventlet
-
-        celery worker --autoscale=10,0
-    """
-    Command = 'celery.bin.worker:WorkerCommand'
-
-    def run_from_argv(self, prog_name, argv):
-        self.target.maybe_detach(argv)
-        super(worker, self).run_from_argv(prog_name, argv)
-
-
-@command
-class events(Delegate):
-    """Event-stream utilities.
-
-    Commands::
-
-        celery events --app=proj
-            start graphical monitor (requires curses)
-        celery events -d --app=proj
-            dump events to screen.
-        celery events -b amqp://
-        celery events -C <camera> [options]
-            run snapshot camera.
-
-    Examples::
-
-        celery events
-        celery events -d
-        celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
-    """
-    Command = 'celery.bin.events:EvCommand'
-
-
-@command
-class beat(Delegate):
-    """Start the beat periodic task scheduler.
-
-    Examples::
-
-        celery beat -l info
-        celery beat -s /var/run/celery/beat-schedule --detach
-        celery beat -S djcelery.schedulers.DatabaseScheduler
-
-    """
-    Command = 'celery.bin.beat:BeatCommand'
-
-
-@command
-class amqp(Delegate):
-    """AMQP Administration Shell.
-
-    Also works for non-amqp transports.
-
-    Examples::
-
-        celery amqp
-            start shell mode
-        celery amqp help
-            show list of commands
-
-        celery amqp exchange.delete name
-        celery amqp queue.delete queue
-        celery amqp queue.delete queue yes yes
-
-    """
-    Command = 'celery.bin.amqp:AMQPAdminCommand'
-
-
-@command(name='list')
 class list_(Command):
     """Get info from broker.
 
@@ -381,7 +126,6 @@ class list_(Command):
             topics[what](conn.manager)
 
 
-@command
 class call(Command):
     """Call a task by name.
 
@@ -437,7 +181,6 @@ class call(Command):
         self.out(res.id)
 
 
-@command
 class purge(Command):
     """Erase all messages from all known task queues.
 
@@ -457,7 +200,6 @@ class purge(Command):
             queues=text.pluralize(queues, 'queue')))
 
 
-@command
 class result(Command):
     """Gives the return value for a given task id.
 
@@ -487,7 +229,7 @@ class result(Command):
             value = result.traceback
         else:
             value = result.get()
-        self.out(self.prettify(value)[1])
+        self.out(self.pretty(value)[1])
 
 
 class _RemoteControl(Command):
@@ -500,6 +242,11 @@ class _RemoteControl(Command):
                 Option('--destination', '-d',
                     help='Comma separated list of destination node names.'))
 
+    def __init__(self, *args, **kwargs):
+        self.show_body = kwargs.pop('show_body', True)
+        self.show_reply = kwargs.pop('show_reply', True)
+        super(_RemoteControl, self).__init__(*args, **kwargs)
+
     @classmethod
     def get_command_info(self, command, indent=0, prefix='', color=None,
             help=False):
@@ -581,7 +328,6 @@ class _RemoteControl(Command):
             self.out(body)
 
 
-@command
 class inspect(_RemoteControl):
     """Inspect the worker at runtime.
 
@@ -614,7 +360,6 @@ class inspect(_RemoteControl):
         return getattr(i, method)(*args)
 
 
-@command
 class control(_RemoteControl):
     """Workers remote control.
 
@@ -681,17 +426,16 @@ class control(_RemoteControl):
         return self.call(method, queue, reply=True, **kwargs)
 
 
-@command
 class status(Command):
     """Show list of workers that are online."""
     option_list = inspect.option_list
 
     def run(self, *args, **kwargs):
-        replies = inspect(app=self.app,
-                          no_color=kwargs.get('no_color', False),
-                          stdout=self.stdout, stderr=self.stderr,
-                          show_reply=False) \
-                    .run('ping', **dict(kwargs, quiet=True, show_body=False))
+        I = inspect(app=self.app,
+                    no_color=kwargs.get('no_color', False),
+                    stdout=self.stdout, stderr=self.stderr,
+                    show_reply=False, show_body=False, quiet=True)
+        replies = I.run('ping', **kwargs)
         if not replies:
             raise Error('No nodes replied within time constraint',
                         status=EX_UNAVAILABLE)
@@ -701,7 +445,6 @@ class status(Command):
                 nodecount, text.pluralize(nodecount, 'node')))
 
 
-@command
 class migrate(Command):
     """Migrate tasks from one broker to another.
 
@@ -735,7 +478,9 @@ class migrate(Command):
 
     def run(self, *args, **kwargs):
         if len(args) != 2:
-            return self.exit_help('migrate')
+            # this never exits due to OptionParser.parse_options
+            self.run_from_argv(self.prog_name, ['migrate', '--help'])
+            raise SystemExit()
         from kombu import Connection
         from celery.contrib.migrate import migrate_tasks
 
@@ -745,7 +490,6 @@ class migrate(Command):
                       **kwargs)
 
 
-@command
 class shell(Command):  # pragma: no cover
     """Start shell session with convenient access to celery symbols.
 
@@ -859,7 +603,6 @@ class shell(Command):  # pragma: no cover
         bpython.embed(self.locals)
 
 
-@command
 class help(Command):
     """Show help screen and exit."""
 
@@ -874,7 +617,6 @@ class help(Command):
         return EX_USAGE
 
 
-@command
 class report(Command):
     """Shows information useful to include in bugreports."""
 
@@ -883,11 +625,37 @@ class report(Command):
         return EX_OK
 
 
-class CeleryCommand(BaseCommand):
-    commands = commands
+class CeleryCommand(Command):
+    namespace = 'celery'
+    ext_fmt = '{self.namespace}.commands'
+    commands = {
+        'amqp': amqp,
+        'beat': beat,
+        'call': call,
+        'control': control,
+        'events': events,
+        'graph': graph,
+        'help': help,
+        'inspect': inspect,
+        'list': list_,
+        'migrate': migrate,
+        'multi': multi,
+        'purge': purge,
+        'report': report,
+        'result': result,
+        'shell': shell,
+        'status': status,
+        'worker': worker,
+
+    }
     enable_config_from_cmdline = True
     prog_name = 'celery'
 
+    @classmethod
+    def register_command(cls, fun, name=None):
+        cls.commands[name or fun.__name__] = fun
+        return fun
+
     def execute(self, command, argv=None):
         try:
             cls = self.commands[command]
@@ -895,7 +663,8 @@ class CeleryCommand(BaseCommand):
             cls, argv = self.commands['help'], ['help']
         cls = self.commands.get(command) or self.commands['help']
         try:
-            return cls(app=self.app).run_from_argv(self.prog_name, argv)
+            return cls(app=self.app).run_from_argv(self.prog_name,
+                    argv[1:], command=argv[0])
         except Error:
             return self.execute('help', argv)
 
@@ -965,180 +734,20 @@ class CeleryCommand(BaseCommand):
             return (['-P'], ['--pool'])
 
     def on_concurrency_setup(self):
-        load_extension_commands()
-
-
-@command
-class graph(Command):
-    args = """<TYPE> [arguments]
-            .....  bootsteps [worker] [consumer]
-            .....  workers   [enumerate]
-    """
-
-    def run(self, what=None, *args, **kwargs):
-        map = {'bootsteps': self.bootsteps, 'workers': self.workers}
-        not what and self.exit_help('graph')
-        if what not in map:
-            raise Error('no graph {0} in {1}'.format(what, '|'.join(map)))
-        return map[what](*args, **kwargs)
-
-    def bootsteps(self, *args, **kwargs):
-        worker = self.app.WorkController()
-        include = set(arg.lower() for arg in args or ['worker', 'consumer'])
-        if 'worker' in include:
-            graph = worker.namespace.graph
-            if 'consumer' in include:
-                worker.namespace.connect_with(worker.consumer.namespace)
-        else:
-            graph = worker.consumer.namespace.graph
-        graph.to_dot(self.stdout)
-
-    def workers(self, *args, **kwargs):
-
-        def simplearg(arg):
-            return maybe_list(itemgetter(0, 2)(arg.partition(':')))
-
-        def maybe_list(l, sep=','):
-            return (l[0], l[1].split(sep) if sep in l[1] else l[1])
-
-        args = dict(simplearg(arg) for arg in args)
-        generic = 'generic' in args
-
-        def generic_label(node):
-            return '{0} ({1}://)'.format(type(node).__name__,
-                                         node._label.split('://')[0])
-
-        class Node(object):
-            force_label = None
-            scheme = {}
+        self.load_extension_commands()
 
-            def __init__(self, label, pos=None):
-                self._label = label
-                self.pos = pos
+    def load_extension_commands(self):
+        names = Extensions(self.ext_fmt.format(self=self),
+                           self.register_command).load()
+        if names:
+            command_classes.append(('Extensions', names, 'magenta'))
 
-            def label(self):
-                return self._label
 
-            def __str__(self):
-                return self.label()
-
-        class Thread(Node):
-            scheme = {'fillcolor': 'lightcyan4', 'fontcolor': 'yellow',
-                      'shape': 'oval', 'fontsize': 10, 'width': 0.3,
-                      'color': 'black'}
-
-            def __init__(self, label, **kwargs):
-                self._label = 'thr-{0}'.format(next(tids))
-                self.real_label = label
-                self.pos = 0
-
-        class Formatter(GraphFormatter):
-
-            def label(self, obj):
-                return obj and obj.label()
-
-            def node(self, obj):
-                scheme = dict(obj.scheme) if obj.pos else obj.scheme
-                if isinstance(obj, Thread):
-                    scheme['label'] = obj.real_label
-                return self.draw_node(
-                    obj, dict(self.node_scheme, **scheme),
-                )
-
-            def terminal_node(self, obj):
-                return self.draw_node(
-                    obj, dict(self.term_scheme, **obj.scheme),
-                )
-
-            def edge(self, a, b, **attrs):
-                if isinstance(a, Thread):
-                    attrs.update(arrowhead='none', arrowtail='tee')
-                return self.draw_edge(a, b, self.edge_scheme, attrs)
-
-        def subscript(n):
-            S = {'0': '₀', '1': '₁', '2': '₂', '3': '₃', '4': '₄',
-                 '5': '₅', '6': '₆', '7': '₇', '8': '₈', '9': '₉'}
-            return ''.join([S[i] for i in str(n)])
-
-        class Worker(Node):
-            pass
-
-        class Backend(Node):
-            scheme = {'shape': 'folder', 'width': 2,
-                      'height': 1, 'color': 'black',
-                      'fillcolor': 'peachpuff3', 'color': 'peachpuff4'}
-
-            def label(self):
-                return generic_label(self) if generic else self._label
-
-        class Broker(Node):
-            scheme = {'shape': 'circle', 'fillcolor': 'cadetblue3',
-                      'color': 'cadetblue4', 'height': 1}
-
-            def label(self):
-                return generic_label(self) if generic else self._label
-
-        from itertools import count
-        tids = count(1)
-        Wmax = int(args.get('wmax', 4) or 0)
-        Tmax = int(args.get('tmax', 3) or 0)
-
-        def maybe_abbr(l, name, max=Wmax):
-            size = len(l)
-            abbr = max and size > max
-            if 'enumerate' in args:
-                l = ['{0}{1}'.format(name, subscript(i + 1))
-                        for i, obj in enumerate(l)]
-            if abbr:
-                l = l[0:max - 1] + [l[size - 1]]
-                l[max - 2] = '{0}⎨…{1}⎬'.format(
-                    name[0], subscript(size - (max - 1)))
-            return l
-
-        try:
-            workers = args['nodes']
-            threads = args.get('threads') or []
-        except KeyError:
-            replies = self.app.control.inspect().stats()
-            workers, threads = [], []
-            for worker, reply in items(replies):
-                workers.append(worker)
-                threads.append(reply['pool']['max-concurrency'])
-
-        wlen = len(workers)
-        backend = args.get('backend', self.app.conf.CELERY_RESULT_BACKEND)
-        threads_for = {}
-        workers = maybe_abbr(workers, 'Worker')
-        if Wmax and wlen > Wmax:
-            threads = threads[0:3] + [threads[-1]]
-        for i, threads in enumerate(threads):
-            threads_for[workers[i]] = maybe_abbr(
-                range(int(threads)), 'P', Tmax,
-            )
-
-        broker = Broker(args.get('broker', self.app.connection().as_uri()))
-        backend = Backend(backend) if backend else None
-        graph = DependencyGraph(formatter=Formatter())
-        graph.add_arc(broker)
-        if backend:
-            graph.add_arc(backend)
-        curworker = [0]
-        for i, worker in enumerate(workers):
-            worker = Worker(worker, pos=i)
-            graph.add_arc(worker)
-            graph.add_edge(worker, broker)
-            if backend:
-                graph.add_edge(worker, backend)
-            threads = threads_for.get(worker._label)
-            if threads:
-                for thread in threads:
-                    thread = Thread(thread)
-                    graph.add_arc(thread)
-                    graph.add_edge(thread, worker)
-
-            curworker[0] += 1
-
-        graph.to_dot(self.stdout)
+def command(*args, **kwargs):
+    """Deprecated: Use classmethod :meth:`CeleryCommand.register_command`
+    instead."""
+    _register = CeleryCommand.register_command
+    return _register(args[0]) if args else _register
 
 
 if __name__ == '__main__':          # pragma: no cover

+ 20 - 2
celery/bin/events.py

@@ -45,7 +45,25 @@ from celery.platforms import detached, set_process_title, strargv
 from celery.bin.base import Command, Option, daemon_options
 
 
-class EvCommand(Command):
+class events(Command):
+    """Event-stream utilities.
+
+    Commands::
+
+        celery events --app=proj
+            start graphical monitor (requires curses)
+        celery events -d --app=proj
+            dump events to screen.
+        celery events -b amqp://
+        celery events -C <camera> [options]
+            run snapshot camera.
+
+    Examples::
+
+        celery events
+        celery events -d
+        celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
+    """
     doc = __doc__
     supports_args = False
 
@@ -112,7 +130,7 @@ class EvCommand(Command):
 
 
 def main():
-    ev = EvCommand()
+    ev = events()
     ev.execute_from_commandline()
 
 if __name__ == '__main__':              # pragma: no cover

+ 189 - 0
celery/bin/graph.py

@@ -0,0 +1,189 @@
+# -*- coding: utf-8 -*-
+"""
+
+celery.bin.graph
+----------------
+
+The :program:`celery graph` command.
+
+"""
+from __future__ import absolute_import, unicode_literals
+
+from operator import itemgetter
+
+from celery.datastructures import DependencyGraph, GraphFormatter
+from celery.five import items
+
+from .base import Command, Error
+
+
+class graph(Command):
+    args = """<TYPE> [arguments]
+            .....  bootsteps [worker] [consumer]
+            .....  workers   [enumerate]
+    """
+
+    def run(self, what=None, *args, **kwargs):
+        map = {'bootsteps': self.bootsteps, 'workers': self.workers}
+        not what and self.exit_help('graph')
+        if what not in map:
+            raise Error('no graph {0} in {1}'.format(what, '|'.join(map)))
+        return map[what](*args, **kwargs)
+
+    def bootsteps(self, *args, **kwargs):
+        worker = self.app.WorkController()
+        include = set(arg.lower() for arg in args or ['worker', 'consumer'])
+        if 'worker' in include:
+            graph = worker.namespace.graph
+            if 'consumer' in include:
+                worker.namespace.connect_with(worker.consumer.namespace)
+        else:
+            graph = worker.consumer.namespace.graph
+        graph.to_dot(self.stdout)
+
+    def workers(self, *args, **kwargs):
+
+        def simplearg(arg):
+            return maybe_list(itemgetter(0, 2)(arg.partition(':')))
+
+        def maybe_list(l, sep=','):
+            return (l[0], l[1].split(sep) if sep in l[1] else l[1])
+
+        args = dict(simplearg(arg) for arg in args)
+        generic = 'generic' in args
+
+        def generic_label(node):
+            return '{0} ({1}://)'.format(type(node).__name__,
+                                         node._label.split('://')[0])
+
+        class Node(object):
+            force_label = None
+            scheme = {}
+
+            def __init__(self, label, pos=None):
+                self._label = label
+                self.pos = pos
+
+            def label(self):
+                return self._label
+
+            def __str__(self):
+                return self.label()
+
+        class Thread(Node):
+            scheme = {'fillcolor': 'lightcyan4', 'fontcolor': 'yellow',
+                      'shape': 'oval', 'fontsize': 10, 'width': 0.3,
+                      'color': 'black'}
+
+            def __init__(self, label, **kwargs):
+                self._label = 'thr-{0}'.format(next(tids))
+                self.real_label = label
+                self.pos = 0
+
+        class Formatter(GraphFormatter):
+
+            def label(self, obj):
+                return obj and obj.label()
+
+            def node(self, obj):
+                scheme = dict(obj.scheme) if obj.pos else obj.scheme
+                if isinstance(obj, Thread):
+                    scheme['label'] = obj.real_label
+                return self.draw_node(
+                    obj, dict(self.node_scheme, **scheme),
+                )
+
+            def terminal_node(self, obj):
+                return self.draw_node(
+                    obj, dict(self.term_scheme, **obj.scheme),
+                )
+
+            def edge(self, a, b, **attrs):
+                if isinstance(a, Thread):
+                    attrs.update(arrowhead='none', arrowtail='tee')
+                return self.draw_edge(a, b, self.edge_scheme, attrs)
+
+        def subscript(n):
+            S = {'0': '₀', '1': '₁', '2': '₂', '3': '₃', '4': '₄',
+                 '5': '₅', '6': '₆', '7': '₇', '8': '₈', '9': '₉'}
+            return ''.join([S[i] for i in str(n)])
+
+        class Worker(Node):
+            pass
+
+        class Backend(Node):
+            scheme = {'shape': 'folder', 'width': 2,
+                      'height': 1, 'color': 'black',
+                      'fillcolor': 'peachpuff3', 'color': 'peachpuff4'}
+
+            def label(self):
+                return generic_label(self) if generic else self._label
+
+        class Broker(Node):
+            scheme = {'shape': 'circle', 'fillcolor': 'cadetblue3',
+                      'color': 'cadetblue4', 'height': 1}
+
+            def label(self):
+                return generic_label(self) if generic else self._label
+
+        from itertools import count
+        tids = count(1)
+        Wmax = int(args.get('wmax', 4) or 0)
+        Tmax = int(args.get('tmax', 3) or 0)
+
+        def maybe_abbr(l, name, max=Wmax):
+            size = len(l)
+            abbr = max and size > max
+            if 'enumerate' in args:
+                l = ['{0}{1}'.format(name, subscript(i + 1))
+                        for i, obj in enumerate(l)]
+            if abbr:
+                l = l[0:max - 1] + [l[size - 1]]
+                l[max - 2] = '{0}⎨…{1}⎬'.format(
+                    name[0], subscript(size - (max - 1)))
+            return l
+
+        try:
+            workers = args['nodes']
+            threads = args.get('threads') or []
+        except KeyError:
+            replies = self.app.control.inspect().stats()
+            workers, threads = [], []
+            for worker, reply in items(replies):
+                workers.append(worker)
+                threads.append(reply['pool']['max-concurrency'])
+
+        wlen = len(workers)
+        backend = args.get('backend', self.app.conf.CELERY_RESULT_BACKEND)
+        threads_for = {}
+        workers = maybe_abbr(workers, 'Worker')
+        if Wmax and wlen > Wmax:
+            threads = threads[0:3] + [threads[-1]]
+        for i, threads in enumerate(threads):
+            threads_for[workers[i]] = maybe_abbr(
+                range(int(threads)), 'P', Tmax,
+            )
+
+        broker = Broker(args.get('broker', self.app.connection().as_uri()))
+        backend = Backend(backend) if backend else None
+        graph = DependencyGraph(formatter=Formatter())
+        graph.add_arc(broker)
+        if backend:
+            graph.add_arc(backend)
+        curworker = [0]
+        for i, worker in enumerate(workers):
+            worker = Worker(worker, pos=i)
+            graph.add_arc(worker)
+            graph.add_edge(worker, broker)
+            if backend:
+                graph.add_edge(worker, backend)
+            threads = threads_for.get(worker._label)
+            if threads:
+                for thread in threads:
+                    thread = Thread(thread)
+                    graph.add_arc(thread)
+                    graph.add_edge(thread, worker)
+
+            curworker[0] += 1
+
+        graph.to_dot(self.stdout)

+ 18 - 6
celery/bin/worker.py

@@ -125,18 +125,30 @@ from celery.five import string_t
 from celery.utils.log import LOG_LEVELS, mlevel
 
 
-class WorkerCommand(Command):
+class worker(Command):
+    """Start worker instance.
+
+    Examples::
+
+        celery worker --app=proj -l info
+        celery worker -A proj -l info -Q hipri,lopri
+
+        celery worker -A proj --concurrency=4
+        celery worker -A proj --concurrency=1000 -P eventlet
+
+        celery worker --autoscale=10,0
+    """
     doc = __doc__  # parse help from this.
     namespace = 'celeryd'
     enable_config_from_cmdline = True
     supports_args = False
 
-    def execute_from_commandline(self, argv=None):
+    def run_from_argv(self, prog_name, argv=None, command=None):
+        argv = list(sys.argv) if argv is None else argv
         self.maybe_detach(argv)
-        return super(WorkerCommand, self).execute_from_commandline(argv)
+        return super(worker, self).run_from_argv(prog_name, argv, command)
 
     def maybe_detach(self, argv, dopts=['-D', '--detach']):
-        argv = list(sys.argv) if argv is None else argv
         if any(arg in argv for arg in dopts):
             argv = [arg for arg in argv if arg not in dopts]
             # never returns
@@ -160,6 +172,7 @@ class WorkerCommand(Command):
                 self.die('Unknown level {0!r}. Please use one of {1}.'.format(
                     loglevel, '|'.join(l for l in LOG_LEVELS
                       if isinstance(l, string_t))))
+
         return self.app.Worker(
             hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, **kwargs
         ).start()
@@ -209,8 +222,7 @@ def main():
         sys.modules['__main__'] = sys.modules[__name__]
     from billiard import freeze_support
     freeze_support()
-    worker = WorkerCommand()
-    worker.execute_from_commandline()
+    worker().execute_from_commandline()
 
 
 if __name__ == '__main__':          # pragma: no cover