Ver Fonte

Commands: Each command now has separate module.

Ask Solem há 8 anos atrás
pai
commit
0919237b0d

+ 77 - 0
celery/bin/call.py

@@ -0,0 +1,77 @@
+from __future__ import absolute_import, unicode_literals
+from kombu.utils.json import loads
+from celery.bin.base import Command
+from celery.five import string_t
+from celery.utils.time import maybe_iso8601
+
+
+class call(Command):
+    """Call a task by name.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery call tasks.add --args='[2, 2]'
+            $ celery call tasks.add --args='[2, 2]' --countdown=10
+    """
+
+    args = '<task_name>'
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Calling Options')
+        group.add_argument('--args', '-a',
+                           help='positional arguments (json).')
+        group.add_argument('--kwargs', '-k',
+                           help='keyword arguments (json).')
+        group.add_argument('--eta',
+                           help='scheduled time (ISO-8601).')
+        group.add_argument(
+            '--countdown', type=float,
+            help='eta in seconds from now (float/int).',
+        )
+        group.add_argument(
+            '--expires',
+            help='expiry time (ISO-8601/float/int).',
+        ),
+        group.add_argument(
+            '--serializer', default='json',
+            help='defaults to json.'),
+
+        ropts = parser.add_argument_group('Routing Options')
+        ropts.add_argument('--queue', help='custom queue name.')
+        ropts.add_argument('--exchange', help='custom exchange name.')
+        ropts.add_argument('--routing-key', help='custom routing key.')
+
+    def run(self, name, *_, **kwargs):
+        self._send_task(name, **kwargs)
+
+    def _send_task(self, name, args=None, kwargs=None,
+                   countdown=None, serializer=None,
+                   queue=None, exchange=None, routing_key=None,
+                   eta=None, expires=None, **_):
+        # arguments
+        args = loads(args) if isinstance(args, string_t) else args
+        kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs
+
+        # Expires can be int/float.
+        try:
+            expires = float(expires)
+        except (TypeError, ValueError):
+            # or a string describing an ISO 8601 datetime.
+            try:
+                expires = maybe_iso8601(expires)
+            except (TypeError, ValueError):
+                raise
+
+        # send the task and print the id.
+        self.out(self.app.send_task(
+            name,
+            args=args or (), kwargs=kwargs or {},
+            countdown=countdown,
+            serializer=serializer,
+            queue=queue,
+            exchange=exchange,
+            routing_key=routing_key,
+            eta=maybe_iso8601(eta),
+            expires=expires,
+        ).id)

+ 9 - 754
celery/bin/celery.py

@@ -256,25 +256,14 @@ in any command that also has a `--detach` option.
 """
 from __future__ import absolute_import, unicode_literals, print_function
 
-import codecs
 import numbers
-import os
 import sys
 
 from functools import partial
-from importlib import import_module
 
-from kombu.utils.json import dumps, loads
-from kombu.utils.objects import cached_property
-
-from celery.app import defaults
-from celery.five import items, keys, string_t, values
-from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
+from celery.platforms import EX_OK, EX_FAILURE, EX_USAGE
 from celery.utils import term
 from celery.utils import text
-from celery.utils.functional import pass1
-from celery.utils.text import str_to_list
-from celery.utils.time import maybe_iso8601
 
 # Cannot use relative imports here due to a Windows issue (#1111).
 from celery.bin.base import Command, Extensions
@@ -282,10 +271,18 @@ from celery.bin.base import Command, Extensions
 # Import commands from other modules
 from celery.bin.amqp import amqp
 from celery.bin.beat import beat
+from celery.bin.call import call
+from celery.bin.control import _RemoteControl, control, inspect, status
 from celery.bin.events import events
 from celery.bin.graph import graph
+from celery.bin.list import list_
 from celery.bin.logtool import logtool
+from celery.bin.migrate import migrate
+from celery.bin.purge import purge
+from celery.bin.result import result
+from celery.bin.shell import shell
 from celery.bin.worker import worker
+from celery.bin.upgrade import upgrade
 
 __all__ = ['CeleryCommand', 'main']
 
@@ -298,11 +295,6 @@ HELP = """
 Type '{prog_name} <command> --help' for help using a specific command.
 """
 
-MIGRATE_PROGRESS_FMT = """\
-Migrating task {state.count}/{state.strtotal}: \
-{body[task]}[{body[id]}]\
-"""
-
 command_classes = [
     ('Main', ['worker', 'events', 'beat', 'shell', 'multi', 'amqp'], 'green'),
     ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
@@ -347,743 +339,6 @@ class multi(Command):
         return cmd.execute_from_commandline([command] + argv)
 
 
-class list_(Command):
-    """Get info from broker.
-
-    Note:
-       For RabbitMQ the management plugin is required.
-
-    Example:
-        .. code-block:: console
-
-            $ celery list bindings
-    """
-
-    args = '[bindings]'
-
-    def list_bindings(self, management):
-        try:
-            bindings = management.get_bindings()
-        except NotImplementedError:
-            raise self.Error('Your transport cannot list bindings.')
-
-        def fmt(q, e, r):
-            return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
-        fmt('Queue', 'Exchange', 'Routing Key')
-        fmt('-' * 16, '-' * 16, '-' * 16)
-        for b in bindings:
-            fmt(b['destination'], b['source'], b['routing_key'])
-
-    def run(self, what=None, *_, **kw):
-        topics = {'bindings': self.list_bindings}
-        available = ', '.join(topics)
-        if not what:
-            raise self.UsageError(
-                'Missing argument, specify one of: {0}'.format(available))
-        if what not in topics:
-            raise self.UsageError(
-                'unknown topic {0!r} (choose one of: {1})'.format(
-                    what, available))
-        with self.app.connection() as conn:
-            self.app.amqp.TaskConsumer(conn).declare()
-            topics[what](conn.manager)
-
-
-class call(Command):
-    """Call a task by name.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery call tasks.add --args='[2, 2]'
-            $ celery call tasks.add --args='[2, 2]' --countdown=10
-    """
-
-    args = '<task_name>'
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Calling Options')
-        group.add_argument('--args', '-a',
-                           help='positional arguments (json).')
-        group.add_argument('--kwargs', '-k',
-                           help='keyword arguments (json).')
-        group.add_argument('--eta',
-                           help='scheduled time (ISO-8601).')
-        group.add_argument(
-            '--countdown', type=float,
-            help='eta in seconds from now (float/int).',
-        )
-        group.add_argument(
-            '--expires',
-            help='expiry time (ISO-8601/float/int).',
-        ),
-        group.add_argument(
-            '--serializer', default='json',
-            help='defaults to json.'),
-
-        ropts = parser.add_argument_group('Routing Options')
-        ropts.add_argument('--queue', help='custom queue name.')
-        ropts.add_argument('--exchange', help='custom exchange name.')
-        ropts.add_argument('--routing-key', help='custom routing key.')
-
-    def run(self, name, *_, **kwargs):
-        self._send_task(name, **kwargs)
-
-    def _send_task(self, name, args=None, kwargs=None,
-                   countdown=None, serializer=None,
-                   queue=None, exchange=None, routing_key=None,
-                   eta=None, expires=None, **_):
-        # arguments
-        args = loads(args) if isinstance(args, string_t) else args
-        kwargs = loads(kwargs) if isinstance(kwargs, string_t) else kwargs
-
-        # Expires can be int/float.
-        try:
-            expires = float(expires)
-        except (TypeError, ValueError):
-            # or a string describing an ISO 8601 datetime.
-            try:
-                expires = maybe_iso8601(expires)
-            except (TypeError, ValueError):
-                raise
-
-        # send the task and print the id.
-        self.out(self.app.send_task(
-            name,
-            args=args or (), kwargs=kwargs or {},
-            countdown=countdown,
-            serializer=serializer,
-            queue=queue,
-            exchange=exchange,
-            routing_key=routing_key,
-            eta=maybe_iso8601(eta),
-            expires=expires,
-        ).id)
-
-
-class purge(Command):
-    """Erase all messages from all known task queues.
-
-    Warning:
-        There's no undo operation for this command.
-    """
-
-    warn_prelude = (
-        '{warning}: This will remove all tasks from {queues}: {names}.\n'
-        '         There is no undo for this operation!\n\n'
-        '(to skip this prompt use the -f option)\n'
-    )
-    warn_prompt = 'Are you sure you want to delete all tasks'
-
-    fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
-    fmt_empty = 'No messages purged from {qnum} {queues}'
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Purging Options')
-        group.add_argument(
-            '--force', '-f', action='store_true', default=False,
-            help="Don't prompt for verification",
-        )
-        group.add_argument(
-            '--queues', '-Q', default=[],
-            help='Comma separated list of queue names to purge.',
-        )
-        group.add_argument(
-            '--exclude-queues', '-X', default=[],
-            help='Comma separated list of queues names not to purge.',
-        )
-
-    def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
-        queues = set(str_to_list(queues or []))
-        exclude = set(str_to_list(exclude_queues or []))
-        names = (queues or set(keys(self.app.amqp.queues))) - exclude
-        qnum = len(names)
-
-        messages = None
-        if names:
-            if not force:
-                self.out(self.warn_prelude.format(
-                    warning=self.colored.red('WARNING'),
-                    queues=text.pluralize(qnum, 'queue'),
-                    names=', '.join(sorted(names)),
-                ))
-                if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
-                    return
-            with self.app.connection_for_write() as conn:
-                messages = sum(self._purge(conn, queue) for queue in names)
-        fmt = self.fmt_purged if messages else self.fmt_empty
-        self.out(fmt.format(
-            mnum=messages, qnum=qnum,
-            messages=text.pluralize(messages, 'message'),
-            queues=text.pluralize(qnum, 'queue')))
-
-    def _purge(self, conn, queue):
-        try:
-            return conn.default_channel.queue_purge(queue) or 0
-        except conn.channel_errors:
-            return 0
-
-
-class result(Command):
-    """Gives the return value for a given task id.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
-            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
-            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
-    """
-
-    args = '<task_id>'
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Result Options')
-        group.add_argument(
-            '--task', '-t', help='name of task (if custom backend)',
-        )
-        group.add_argument(
-            '--traceback', action='store_true', default=False,
-            help='show traceback instead',
-        )
-
-    def run(self, task_id, *args, **kwargs):
-        result_cls = self.app.AsyncResult
-        task = kwargs.get('task')
-        traceback = kwargs.get('traceback', False)
-
-        if task:
-            result_cls = self.app.tasks[task].AsyncResult
-        task_result = result_cls(task_id)
-        if traceback:
-            value = task_result.traceback
-        else:
-            value = task_result.get()
-        self.out(self.pretty(value)[1])
-
-
-class _RemoteControl(Command):
-
-    name = None
-    leaf = False
-    control_group = None
-
-    def __init__(self, *args, **kwargs):
-        self.show_body = kwargs.pop('show_body', True)
-        self.show_reply = kwargs.pop('show_reply', True)
-        super(_RemoteControl, self).__init__(*args, **kwargs)
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Remote Control Options')
-        group.add_argument(
-            '--timeout', '-t', type=float,
-            help='Timeout in seconds (float) waiting for reply',
-        )
-        group.add_argument(
-            '--destination', '-d',
-            help='Comma separated list of destination node names.')
-        group.add_argument(
-            '--json', '-j', action='store_true', default=False,
-            help='Use json as output format.',
-        )
-
-    @classmethod
-    def get_command_info(cls, command,
-                         indent=0, prefix='', color=None,
-                         help=False, app=None, choices=None):
-        if choices is None:
-            choices = cls._choices_by_group(app)
-        meta = choices[command]
-        if help:
-            help = '|' + text.indent(meta.help, indent + 4)
-        else:
-            help = None
-        return text.join([
-            '|' + text.indent('{0}{1} {2}'.format(
-                prefix, color(command), meta.signature or ''), indent),
-            help,
-        ])
-
-    @classmethod
-    def list_commands(cls, indent=0, prefix='',
-                      color=None, help=False, app=None):
-        choices = cls._choices_by_group(app)
-        color = color if color else lambda x: x
-        prefix = prefix + ' ' if prefix else ''
-        return '\n'.join(
-            cls.get_command_info(c, indent, prefix, color, help,
-                                 app=app, choices=choices)
-            for c in sorted(choices))
-
-    def usage(self, command):
-        return '%(prog)s {0} [options] {1} <command> [arg1 .. argN]'.format(
-            command, self.args)
-
-    def call(self, *args, **kwargs):
-        raise NotImplementedError('call')
-
-    def run(self, *args, **kwargs):
-        if not args:
-            raise self.UsageError(
-                'Missing {0.name} method.  See --help'.format(self))
-        return self.do_call_method(args, **kwargs)
-
-    def _ensure_fanout_supported(self):
-        with self.app.connection_for_write() as conn:
-            if not conn.supports_exchange_type('fanout'):
-                raise self.Error(
-                    'Broadcast not supported by transport {0!r}'.format(
-                        conn.info()['transport']))
-
-    def do_call_method(self, args,
-                       timeout=None, destination=None, json=False, **kwargs):
-        method = args[0]
-        if method == 'help':
-            raise self.Error("Did you mean '{0.name} --help'?".format(self))
-        try:
-            meta = self.choices[method]
-        except KeyError:
-            raise self.UsageError(
-                'Unknown {0.name} method {1}'.format(self, method))
-
-        self._ensure_fanout_supported()
-
-        timeout = timeout or meta.default_timeout
-        if destination and isinstance(destination, string_t):
-            destination = [dest.strip() for dest in destination.split(',')]
-
-        replies = self.call(
-            method,
-            arguments=self.compile_arguments(meta, method, args[1:]),
-            timeout=timeout,
-            destination=destination,
-            callback=None if json else self.say_remote_command_reply,
-        )
-        if not replies:
-            raise self.Error('No nodes replied within time constraint.',
-                             status=EX_UNAVAILABLE)
-        if json:
-            self.out(dumps(replies))
-        return replies
-
-    def compile_arguments(self, meta, method, args):
-        args = list(args)
-        kw = {}
-        if meta.args:
-            kw.update({
-                k: v for k, v in self._consume_args(meta, method, args)
-            })
-        if meta.variadic:
-            kw.update({meta.variadic: args})
-        if not kw and args:
-            raise self.Error(
-                'Command {0!r} takes no arguments.'.format(method),
-                status=EX_USAGE)
-        return kw or {}
-
-    def _consume_args(self, meta, method, args):
-        i = 0
-        try:
-            for i, arg in enumerate(args):
-                try:
-                    name, typ = meta.args[i]
-                except IndexError:
-                    if meta.variadic:
-                        break
-                    raise self.Error(
-                        'Command {0!r} takes arguments: {1}'.format(
-                            method, meta.signature),
-                        status=EX_USAGE)
-                else:
-                    yield name, typ(arg) if typ is not None else arg
-        finally:
-            args[:] = args[i:]
-
-    @classmethod
-    def _choices_by_group(cls, app):
-        from celery.worker.control import Panel
-        # need to import task modules for custom user-remote control commands.
-        app.loader.import_default_modules()
-
-        return {
-            name: info for name, info in items(Panel.meta)
-            if info.type == cls.control_group and info.visible
-        }
-
-    @cached_property
-    def choices(self):
-        return self._choices_by_group(self.app)
-
-    @property
-    def epilog(self):
-        return '\n'.join([
-            '[Commands]',
-            self.list_commands(indent=4, help=True, app=self.app)
-        ])
-
-
-class inspect(_RemoteControl):
-    """Inspect the worker at runtime.
-
-    Availability: RabbitMQ (AMQP) and Redis transports.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery inspect active --timeout=5
-            $ celery inspect scheduled -d worker1@example.com
-            $ celery inspect revoked -d w1@e.com,w2@e.com
-    """
-
-    name = 'inspect'
-    control_group = 'inspect'
-
-    def call(self, method, arguments, **options):
-        return self.app.control.inspect(**options)._request(
-            method, **arguments)
-
-
-class control(_RemoteControl):
-    """Workers remote control.
-
-    Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
-
-    Examples:
-        .. code-block:: console
-
-            $ celery control enable_events --timeout=5
-            $ celery control -d worker1@example.com enable_events
-            $ celery control -d w1.e.com,w2.e.com enable_events
-
-            $ celery control -d w1.e.com add_consumer queue_name
-            $ celery control -d w1.e.com cancel_consumer queue_name
-
-            $ celery control add_consumer queue exchange direct rkey
-    """
-
-    name = 'control'
-    control_group = 'control'
-
-    def call(self, method, arguments, **options):
-        return self.app.control.broadcast(
-            method, arguments=arguments, reply=True, **options)
-
-
-class status(Command):
-    """Show list of workers that are online."""
-
-    option_list = inspect.option_list
-
-    def run(self, *args, **kwargs):
-        I = inspect(
-            app=self.app,
-            no_color=kwargs.get('no_color', False),
-            stdout=self.stdout, stderr=self.stderr,
-            show_reply=False, show_body=False, quiet=True,
-        )
-        replies = I.run('ping', **kwargs)
-        if not replies:
-            raise self.Error('No nodes replied within time constraint',
-                             status=EX_UNAVAILABLE)
-        nodecount = len(replies)
-        if not kwargs.get('quiet', False):
-            self.out('\n{0} {1} online.'.format(
-                nodecount, text.pluralize(nodecount, 'node')))
-
-
-class migrate(Command):
-    """Migrate tasks from one broker to another.
-
-    Warning:
-        This command is experimental, make sure you have a backup of
-        the tasks before you continue.
-
-    Example:
-        .. code-block:: console
-
-            $ celery migrate amqp://A.example.com amqp://guest@B.example.com//
-            $ celery migrate redis://localhost amqp://guest@localhost//
-    """
-
-    args = '<source_url> <dest_url>'
-    progress_fmt = MIGRATE_PROGRESS_FMT
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Migration Options')
-        group.add_argument(
-            '--limit', '-n', type=int,
-            help='Number of tasks to consume (int)',
-        )
-        group.add_argument(
-            '--timeout', '-t', type=float, default=1.0,
-            help='Timeout in seconds (float) waiting for tasks',
-        )
-        group.add_argument(
-            '--ack-messages', '-a', action='store_true', default=False,
-            help='Ack messages from source broker.',
-        )
-        group.add_argument(
-            '--tasks', '-T',
-            help='List of task names to filter on.',
-        )
-        group.add_argument(
-            '--queues', '-Q',
-            help='List of queues to migrate.',
-        )
-        group.add_argument(
-            '--forever', '-F', action='store_true', default=False,
-            help='Continually migrate tasks until killed.',
-        )
-
-    def on_migrate_task(self, state, body, message):
-        self.out(self.progress_fmt.format(state=state, body=body))
-
-    def run(self, source, destination, **kwargs):
-        from kombu import Connection
-        from celery.contrib.migrate import migrate_tasks
-
-        migrate_tasks(Connection(source),
-                      Connection(destination),
-                      callback=self.on_migrate_task,
-                      **kwargs)
-
-
-class shell(Command):  # pragma: no cover
-    """Start shell session with convenient access to celery symbols.
-
-    The following symbols will be added to the main globals:
-
-        - ``celery``:  the current application.
-        - ``chord``, ``group``, ``chain``, ``chunks``,
-          ``xmap``, ``xstarmap`` ``subtask``, ``Task``
-        - all registered tasks.
-    """
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Shell Options')
-        group.add_argument(
-            '--ipython', '-I',
-            action='store_true', help='force iPython.', default=False,
-        )
-        group.add_argument(
-            '--bpython', '-B',
-            action='store_true', help='force bpython.', default=False,
-        )
-        group.add_argument(
-            '--python',
-            action='store_true', default=False,
-            help='force default Python shell.',
-        )
-        group.add_argument(
-            '--without-tasks', '-T',
-            action='store_true', default=False,
-            help="don't add tasks to locals.",
-        )
-        group.add_argument(
-            '--eventlet',
-            action='store_true', default=False,
-            help='use eventlet.',
-        )
-        group.add_argument(
-            '--gevent', action='store_true', default=False,
-            help='use gevent.',
-        )
-
-    def run(self, *args, **kwargs):
-        if args:
-            raise self.UsageError(
-                'shell command does not take arguments: {0}'.format(args))
-        return self._run(**kwargs)
-
-    def _run(self, ipython=False, bpython=False,
-             python=False, without_tasks=False, eventlet=False,
-             gevent=False, **kwargs):
-        sys.path.insert(0, os.getcwd())
-        if eventlet:
-            import_module('celery.concurrency.eventlet')
-        if gevent:
-            import_module('celery.concurrency.gevent')
-        import celery
-        import celery.task.base
-        self.app.loader.import_default_modules()
-
-        # pylint: disable=attribute-defined-outside-init
-        self.locals = {
-            'app': self.app,
-            'celery': self.app,
-            'Task': celery.Task,
-            'chord': celery.chord,
-            'group': celery.group,
-            'chain': celery.chain,
-            'chunks': celery.chunks,
-            'xmap': celery.xmap,
-            'xstarmap': celery.xstarmap,
-            'subtask': celery.subtask,
-            'signature': celery.signature,
-        }
-
-        if not without_tasks:
-            self.locals.update({
-                task.__name__: task for task in values(self.app.tasks)
-                if not task.name.startswith('celery.')
-            })
-
-        if python:
-            return self.invoke_fallback_shell()
-        elif bpython:
-            return self.invoke_bpython_shell()
-        elif ipython:
-            return self.invoke_ipython_shell()
-        return self.invoke_default_shell()
-
-    def invoke_default_shell(self):
-        try:
-            import IPython  # noqa
-        except ImportError:
-            try:
-                import bpython  # noqa
-            except ImportError:
-                return self.invoke_fallback_shell()
-            else:
-                return self.invoke_bpython_shell()
-        else:
-            return self.invoke_ipython_shell()
-
-    def invoke_fallback_shell(self):
-        import code
-        try:
-            import readline
-        except ImportError:
-            pass
-        else:
-            import rlcompleter
-            readline.set_completer(
-                rlcompleter.Completer(self.locals).complete)
-            readline.parse_and_bind('tab:complete')
-        code.interact(local=self.locals)
-
-    def invoke_ipython_shell(self):
-        for ip in (self._ipython, self._ipython_pre_10,
-                   self._ipython_terminal, self._ipython_010,
-                   self._no_ipython):
-            try:
-                return ip()
-            except ImportError:
-                pass
-
-    def _ipython(self):
-        from IPython import start_ipython
-        start_ipython(argv=[], user_ns=self.locals)
-
-    def _ipython_pre_10(self):  # pragma: no cover
-        from IPython.frontend.terminal.ipapp import TerminalIPythonApp
-        app = TerminalIPythonApp.instance()
-        app.initialize(argv=[])
-        app.shell.user_ns.update(self.locals)
-        app.start()
-
-    def _ipython_terminal(self):  # pragma: no cover
-        from IPython.terminal import embed
-        embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
-
-    def _ipython_010(self):  # pragma: no cover
-        from IPython.Shell import IPShell
-        IPShell(argv=[], user_ns=self.locals).mainloop()
-
-    def _no_ipython(self):  # pragma: no cover
-        raise ImportError('no suitable ipython found')
-
-    def invoke_bpython_shell(self):
-        import bpython
-        bpython.embed(self.locals)
-
-
-class upgrade(Command):
-    """Perform upgrade between versions."""
-
-    choices = {'settings'}
-
-    def add_arguments(self, parser):
-        group = parser.add_argument_group('Upgrading Options')
-        group.add_argument(
-            '--django', action='store_true', default=False,
-            help='Upgrade Django project',
-        )
-        group.add_argument(
-            '--compat', action='store_true', default=False,
-            help='Maintain backwards compatibility',
-        )
-        group.add_argument(
-            '--no-backup', action='store_true', default=False,
-            help='Dont backup original files',
-        )
-
-    def usage(self, command):
-        return '%(prog)s <command> settings [filename] [options]'
-
-    def run(self, *args, **kwargs):
-        try:
-            command = args[0]
-        except IndexError:
-            raise self.UsageError(
-                'missing upgrade type: try `celery upgrade settings` ?')
-        if command not in self.choices:
-            raise self.UsageError('unknown upgrade type: {0}'.format(command))
-        return getattr(self, command)(*args, **kwargs)
-
-    def settings(self, command, filename,
-                 no_backup=False, django=False, compat=False, **kwargs):
-        lines = self._slurp(filename)
-        keyfilter = self._compat_key if django or compat else pass1
-        print('processing {0}...'.format(filename), file=self.stderr)
-        # gives list of tuples: ``(did_change, line_contents)``
-        new_lines = [
-            self._to_new_key(line, keyfilter) for line in lines
-        ]
-        if any(n[0] for n in new_lines):  # did have changes
-            if not no_backup:
-                self._backup(filename)
-            with codecs.open(filename, 'w', 'utf-8') as write_fh:
-                for _, line in new_lines:
-                    write_fh.write(line)
-            print('Changes to your setting have been made!',
-                  file=self.stdout)
-        else:
-            print('Does not seem to require any changes :-)',
-                  file=self.stdout)
-
-    def _slurp(self, filename):
-        with codecs.open(filename, 'r', 'utf-8') as read_fh:
-            return [line for line in read_fh]
-
-    def _backup(self, filename, suffix='.orig'):
-        lines = []
-        backup_filename = ''.join([filename, suffix])
-        print('writing backup to {0}...'.format(backup_filename),
-              file=self.stderr)
-        with codecs.open(filename, 'r', 'utf-8') as read_fh:
-            with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
-                for line in read_fh:
-                    backup_fh.write(line)
-                    lines.append(line)
-        return lines
-
-    def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
-        # sort by length to avoid, for example, broker_transport overriding
-        # broker_transport_options.
-        for old_key in reversed(sorted(source, key=lambda x: len(x))):
-            new_line = line.replace(old_key, keyfilter(source[old_key]))
-            if line != new_line and 'CELERY_CELERY' not in new_line:
-                return 1, new_line  # only one match per line.
-        return 0, line
-
-    def _compat_key(self, key, namespace='CELERY'):
-        key = key.upper()
-        if not key.startswith(namespace):
-            key = '_'.join([namespace, key])
-        return key
-
-
 class help(Command):
     """Show help screen and exit."""
 

+ 236 - 0
celery/bin/control.py

@@ -0,0 +1,236 @@
+from __future__ import absolute_import, unicode_literals
+from kombu.utils.json import dumps
+from kombu.utils.objects import cached_property
+from celery.five import items, string_t
+from celery.bin.base import Command
+from celery.platforms import EX_UNAVAILABLE, EX_USAGE
+from celery.utils import text
+
+
+class _RemoteControl(Command):
+
+    name = None
+    leaf = False
+    control_group = None
+
+    def __init__(self, *args, **kwargs):
+        self.show_body = kwargs.pop('show_body', True)
+        self.show_reply = kwargs.pop('show_reply', True)
+        super(_RemoteControl, self).__init__(*args, **kwargs)
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Remote Control Options')
+        group.add_argument(
+            '--timeout', '-t', type=float,
+            help='Timeout in seconds (float) waiting for reply',
+        )
+        group.add_argument(
+            '--destination', '-d',
+            help='Comma separated list of destination node names.')
+        group.add_argument(
+            '--json', '-j', action='store_true', default=False,
+            help='Use json as output format.',
+        )
+
+    @classmethod
+    def get_command_info(cls, command,
+                         indent=0, prefix='', color=None,
+                         help=False, app=None, choices=None):
+        if choices is None:
+            choices = cls._choices_by_group(app)
+        meta = choices[command]
+        if help:
+            help = '|' + text.indent(meta.help, indent + 4)
+        else:
+            help = None
+        return text.join([
+            '|' + text.indent('{0}{1} {2}'.format(
+                prefix, color(command), meta.signature or ''), indent),
+            help,
+        ])
+
+    @classmethod
+    def list_commands(cls, indent=0, prefix='',
+                      color=None, help=False, app=None):
+        choices = cls._choices_by_group(app)
+        color = color if color else lambda x: x
+        prefix = prefix + ' ' if prefix else ''
+        return '\n'.join(
+            cls.get_command_info(c, indent, prefix, color, help,
+                                 app=app, choices=choices)
+            for c in sorted(choices))
+
+    def usage(self, command):
+        return '%(prog)s {0} [options] {1} <command> [arg1 .. argN]'.format(
+            command, self.args)
+
+    def call(self, *args, **kwargs):
+        raise NotImplementedError('call')
+
+    def run(self, *args, **kwargs):
+        if not args:
+            raise self.UsageError(
+                'Missing {0.name} method.  See --help'.format(self))
+        return self.do_call_method(args, **kwargs)
+
+    def _ensure_fanout_supported(self):
+        with self.app.connection_for_write() as conn:
+            if not conn.supports_exchange_type('fanout'):
+                raise self.Error(
+                    'Broadcast not supported by transport {0!r}'.format(
+                        conn.info()['transport']))
+
+    def do_call_method(self, args,
+                       timeout=None, destination=None, json=False, **kwargs):
+        method = args[0]
+        if method == 'help':
+            raise self.Error("Did you mean '{0.name} --help'?".format(self))
+        try:
+            meta = self.choices[method]
+        except KeyError:
+            raise self.UsageError(
+                'Unknown {0.name} method {1}'.format(self, method))
+
+        self._ensure_fanout_supported()
+
+        timeout = timeout or meta.default_timeout
+        if destination and isinstance(destination, string_t):
+            destination = [dest.strip() for dest in destination.split(',')]
+
+        replies = self.call(
+            method,
+            arguments=self.compile_arguments(meta, method, args[1:]),
+            timeout=timeout,
+            destination=destination,
+            callback=None if json else self.say_remote_command_reply,
+        )
+        if not replies:
+            raise self.Error('No nodes replied within time constraint.',
+                             status=EX_UNAVAILABLE)
+        if json:
+            self.out(dumps(replies))
+        return replies
+
+    def compile_arguments(self, meta, method, args):
+        args = list(args)
+        kw = {}
+        if meta.args:
+            kw.update({
+                k: v for k, v in self._consume_args(meta, method, args)
+            })
+        if meta.variadic:
+            kw.update({meta.variadic: args})
+        if not kw and args:
+            raise self.Error(
+                'Command {0!r} takes no arguments.'.format(method),
+                status=EX_USAGE)
+        return kw or {}
+
+    def _consume_args(self, meta, method, args):
+        i = 0
+        try:
+            for i, arg in enumerate(args):
+                try:
+                    name, typ = meta.args[i]
+                except IndexError:
+                    if meta.variadic:
+                        break
+                    raise self.Error(
+                        'Command {0!r} takes arguments: {1}'.format(
+                            method, meta.signature),
+                        status=EX_USAGE)
+                else:
+                    yield name, typ(arg) if typ is not None else arg
+        finally:
+            args[:] = args[i:]
+
+    @classmethod
+    def _choices_by_group(cls, app):
+        from celery.worker.control import Panel
+        # need to import task modules for custom user-remote control commands.
+        app.loader.import_default_modules()
+
+        return {
+            name: info for name, info in items(Panel.meta)
+            if info.type == cls.control_group and info.visible
+        }
+
+    @cached_property
+    def choices(self):
+        return self._choices_by_group(self.app)
+
+    @property
+    def epilog(self):
+        return '\n'.join([
+            '[Commands]',
+            self.list_commands(indent=4, help=True, app=self.app)
+        ])
+
+
+class inspect(_RemoteControl):
+    """Inspect the worker at runtime.
+
+    Availability: RabbitMQ (AMQP) and Redis transports.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery inspect active --timeout=5
+            $ celery inspect scheduled -d worker1@example.com
+            $ celery inspect revoked -d w1@e.com,w2@e.com
+    """
+
+    name = 'inspect'
+    control_group = 'inspect'
+
+    def call(self, method, arguments, **options):
+        return self.app.control.inspect(**options)._request(
+            method, **arguments)
+
+
+class control(_RemoteControl):
+    """Workers remote control.
+
+    Availability: RabbitMQ (AMQP), Redis, and MongoDB transports.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery control enable_events --timeout=5
+            $ celery control -d worker1@example.com enable_events
+            $ celery control -d w1.e.com,w2.e.com enable_events
+
+            $ celery control -d w1.e.com add_consumer queue_name
+            $ celery control -d w1.e.com cancel_consumer queue_name
+
+            $ celery control add_consumer queue exchange direct rkey
+    """
+
+    name = 'control'
+    control_group = 'control'
+
+    def call(self, method, arguments, **options):
+        return self.app.control.broadcast(
+            method, arguments=arguments, reply=True, **options)
+
+
+class status(Command):
+    """Show list of workers that are online."""
+
+    option_list = inspect.option_list
+
+    def run(self, *args, **kwargs):
+        I = inspect(
+            app=self.app,
+            no_color=kwargs.get('no_color', False),
+            stdout=self.stdout, stderr=self.stderr,
+            show_reply=False, show_body=False, quiet=True,
+        )
+        replies = I.run('ping', **kwargs)
+        if not replies:
+            raise self.Error('No nodes replied within time constraint',
+                             status=EX_UNAVAILABLE)
+        nodecount = len(replies)
+        if not kwargs.get('quiet', False):
+            self.out('\n{0} {1} online.'.format(
+                nodecount, text.pluralize(nodecount, 'node')))

+ 44 - 0
celery/bin/list.py

@@ -0,0 +1,44 @@
+from __future__ import absolute_import, unicode_literals
+from celery.bin.base import Command
+
+
+class list_(Command):
+    """Get info from broker.
+
+    Note:
+       For RabbitMQ the management plugin is required.
+
+    Example:
+        .. code-block:: console
+
+            $ celery list bindings
+    """
+
+    args = '[bindings]'
+
+    def list_bindings(self, management):
+        try:
+            bindings = management.get_bindings()
+        except NotImplementedError:
+            raise self.Error('Your transport cannot list bindings.')
+
+        def fmt(q, e, r):
+            return self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
+        fmt('Queue', 'Exchange', 'Routing Key')
+        fmt('-' * 16, '-' * 16, '-' * 16)
+        for b in bindings:
+            fmt(b['destination'], b['source'], b['routing_key'])
+
+    def run(self, what=None, *_, **kw):
+        topics = {'bindings': self.list_bindings}
+        available = ', '.join(topics)
+        if not what:
+            raise self.UsageError(
+                'Missing argument, specify one of: {0}'.format(available))
+        if what not in topics:
+            raise self.UsageError(
+                'unknown topic {0!r} (choose one of: {1})'.format(
+                    what, available))
+        with self.app.connection() as conn:
+            self.app.amqp.TaskConsumer(conn).declare()
+            topics[what](conn.manager)

+ 64 - 0
celery/bin/migrate.py

@@ -0,0 +1,64 @@
+from __future__ import absolute_import, unicode_literals
+from celery.bin.base import Command
+
+MIGRATE_PROGRESS_FMT = """\
+Migrating task {state.count}/{state.strtotal}: \
+{body[task]}[{body[id]}]\
+"""
+
+
+class migrate(Command):
+    """Migrate tasks from one broker to another.
+
+    Warning:
+        This command is experimental, make sure you have a backup of
+        the tasks before you continue.
+
+    Example:
+        .. code-block:: console
+
+            $ celery migrate amqp://A.example.com amqp://guest@B.example.com//
+            $ celery migrate redis://localhost amqp://guest@localhost//
+    """
+
+    args = '<source_url> <dest_url>'
+    progress_fmt = MIGRATE_PROGRESS_FMT
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Migration Options')
+        group.add_argument(
+            '--limit', '-n', type=int,
+            help='Number of tasks to consume (int)',
+        )
+        group.add_argument(
+            '--timeout', '-t', type=float, default=1.0,
+            help='Timeout in seconds (float) waiting for tasks',
+        )
+        group.add_argument(
+            '--ack-messages', '-a', action='store_true', default=False,
+            help='Ack messages from source broker.',
+        )
+        group.add_argument(
+            '--tasks', '-T',
+            help='List of task names to filter on.',
+        )
+        group.add_argument(
+            '--queues', '-Q',
+            help='List of queues to migrate.',
+        )
+        group.add_argument(
+            '--forever', '-F', action='store_true', default=False,
+            help='Continually migrate tasks until killed.',
+        )
+
+    def on_migrate_task(self, state, body, message):
+        self.out(self.progress_fmt.format(state=state, body=body))
+
+    def run(self, source, destination, **kwargs):
+        from kombu import Connection
+        from celery.contrib.migrate import migrate_tasks
+
+        migrate_tasks(Connection(source),
+                      Connection(destination),
+                      callback=self.on_migrate_task,
+                      **kwargs)

+ 67 - 0
celery/bin/purge.py

@@ -0,0 +1,67 @@
+from __future__ import absolute_import, unicode_literals
+from celery.five import keys
+from celery.bin.base import Command
+from celery.utils import text
+
+
+class purge(Command):
+    """Erase all messages from all known task queues.
+
+    Warning:
+        There's no undo operation for this command.
+    """
+
+    warn_prelude = (
+        '{warning}: This will remove all tasks from {queues}: {names}.\n'
+        '         There is no undo for this operation!\n\n'
+        '(to skip this prompt use the -f option)\n'
+    )
+    warn_prompt = 'Are you sure you want to delete all tasks'
+
+    fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
+    fmt_empty = 'No messages purged from {qnum} {queues}'
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Purging Options')
+        group.add_argument(
+            '--force', '-f', action='store_true', default=False,
+            help="Don't prompt for verification",
+        )
+        group.add_argument(
+            '--queues', '-Q', default=[],
+            help='Comma separated list of queue names to purge.',
+        )
+        group.add_argument(
+            '--exclude-queues', '-X', default=[],
+            help='Comma separated list of queues names not to purge.',
+        )
+
+    def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
+        queues = set(text.str_to_list(queues or []))
+        exclude = set(text.str_to_list(exclude_queues or []))
+        names = (queues or set(keys(self.app.amqp.queues))) - exclude
+        qnum = len(names)
+
+        messages = None
+        if names:
+            if not force:
+                self.out(self.warn_prelude.format(
+                    warning=self.colored.red('WARNING'),
+                    queues=text.pluralize(qnum, 'queue'),
+                    names=', '.join(sorted(names)),
+                ))
+                if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
+                    return
+            with self.app.connection_for_write() as conn:
+                messages = sum(self._purge(conn, queue) for queue in names)
+        fmt = self.fmt_purged if messages else self.fmt_empty
+        self.out(fmt.format(
+            mnum=messages, qnum=qnum,
+            messages=text.pluralize(messages, 'message'),
+            queues=text.pluralize(qnum, 'queue')))
+
+    def _purge(self, conn, queue):
+        try:
+            return conn.default_channel.queue_purge(queue) or 0
+        except conn.channel_errors:
+            return 0

+ 40 - 0
celery/bin/result.py

@@ -0,0 +1,40 @@
+from __future__ import absolute_import, unicode_literals
+from celery.bin.base import Command
+
+
+class result(Command):
+    """Gives the return value for a given task id.
+
+    Examples:
+        .. code-block:: console
+
+            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
+            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
+            $ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
+    """
+
+    args = '<task_id>'
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Result Options')
+        group.add_argument(
+            '--task', '-t', help='name of task (if custom backend)',
+        )
+        group.add_argument(
+            '--traceback', action='store_true', default=False,
+            help='show traceback instead',
+        )
+
+    def run(self, task_id, *args, **kwargs):
+        result_cls = self.app.AsyncResult
+        task = kwargs.get('task')
+        traceback = kwargs.get('traceback', False)
+
+        if task:
+            result_cls = self.app.tasks[task].AsyncResult
+        task_result = result_cls(task_id)
+        if traceback:
+            value = task_result.traceback
+        else:
+            value = task_result.get()
+        self.out(self.pretty(value)[1])

+ 156 - 0
celery/bin/shell.py

@@ -0,0 +1,156 @@
+from __future__ import absolute_import, unicode_literals
+import os
+import sys
+from importlib import import_module
+from celery.five import values
+from celery.bin.base import Command
+
+
+class shell(Command):  # pragma: no cover
+    """Start shell session with convenient access to celery symbols.
+
+    The following symbols will be added to the main globals:
+
+        - ``celery``:  the current application.
+        - ``chord``, ``group``, ``chain``, ``chunks``,
+          ``xmap``, ``xstarmap`` ``subtask``, ``Task``
+        - all registered tasks.
+    """
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Shell Options')
+        group.add_argument(
+            '--ipython', '-I',
+            action='store_true', help='force iPython.', default=False,
+        )
+        group.add_argument(
+            '--bpython', '-B',
+            action='store_true', help='force bpython.', default=False,
+        )
+        group.add_argument(
+            '--python',
+            action='store_true', default=False,
+            help='force default Python shell.',
+        )
+        group.add_argument(
+            '--without-tasks', '-T',
+            action='store_true', default=False,
+            help="don't add tasks to locals.",
+        )
+        group.add_argument(
+            '--eventlet',
+            action='store_true', default=False,
+            help='use eventlet.',
+        )
+        group.add_argument(
+            '--gevent', action='store_true', default=False,
+            help='use gevent.',
+        )
+
+    def run(self, *args, **kwargs):
+        if args:
+            raise self.UsageError(
+                'shell command does not take arguments: {0}'.format(args))
+        return self._run(**kwargs)
+
+    def _run(self, ipython=False, bpython=False,
+             python=False, without_tasks=False, eventlet=False,
+             gevent=False, **kwargs):
+        sys.path.insert(0, os.getcwd())
+        if eventlet:
+            import_module('celery.concurrency.eventlet')
+        if gevent:
+            import_module('celery.concurrency.gevent')
+        import celery
+        import celery.task.base
+        self.app.loader.import_default_modules()
+
+        # pylint: disable=attribute-defined-outside-init
+        self.locals = {
+            'app': self.app,
+            'celery': self.app,
+            'Task': celery.Task,
+            'chord': celery.chord,
+            'group': celery.group,
+            'chain': celery.chain,
+            'chunks': celery.chunks,
+            'xmap': celery.xmap,
+            'xstarmap': celery.xstarmap,
+            'subtask': celery.subtask,
+            'signature': celery.signature,
+        }
+
+        if not without_tasks:
+            self.locals.update({
+                task.__name__: task for task in values(self.app.tasks)
+                if not task.name.startswith('celery.')
+            })
+
+        if python:
+            return self.invoke_fallback_shell()
+        elif bpython:
+            return self.invoke_bpython_shell()
+        elif ipython:
+            return self.invoke_ipython_shell()
+        return self.invoke_default_shell()
+
+    def invoke_default_shell(self):
+        try:
+            import IPython  # noqa
+        except ImportError:
+            try:
+                import bpython  # noqa
+            except ImportError:
+                return self.invoke_fallback_shell()
+            else:
+                return self.invoke_bpython_shell()
+        else:
+            return self.invoke_ipython_shell()
+
+    def invoke_fallback_shell(self):
+        import code
+        try:
+            import readline
+        except ImportError:
+            pass
+        else:
+            import rlcompleter
+            readline.set_completer(
+                rlcompleter.Completer(self.locals).complete)
+            readline.parse_and_bind('tab:complete')
+        code.interact(local=self.locals)
+
+    def invoke_ipython_shell(self):
+        for ip in (self._ipython, self._ipython_pre_10,
+                   self._ipython_terminal, self._ipython_010,
+                   self._no_ipython):
+            try:
+                return ip()
+            except ImportError:
+                pass
+
+    def _ipython(self):
+        from IPython import start_ipython
+        start_ipython(argv=[], user_ns=self.locals)
+
+    def _ipython_pre_10(self):  # pragma: no cover
+        from IPython.frontend.terminal.ipapp import TerminalIPythonApp
+        app = TerminalIPythonApp.instance()
+        app.initialize(argv=[])
+        app.shell.user_ns.update(self.locals)
+        app.start()
+
+    def _ipython_terminal(self):  # pragma: no cover
+        from IPython.terminal import embed
+        embed.TerminalInteractiveShell(user_ns=self.locals).mainloop()
+
+    def _ipython_010(self):  # pragma: no cover
+        from IPython.Shell import IPShell
+        IPShell(argv=[], user_ns=self.locals).mainloop()
+
+    def _no_ipython(self):  # pragma: no cover
+        raise ImportError('no suitable ipython found')
+
+    def invoke_bpython_shell(self):
+        import bpython
+        bpython.embed(self.locals)

+ 91 - 0
celery/bin/upgrade.py

@@ -0,0 +1,91 @@
+from __future__ import absolute_import, print_function, unicode_literals
+import codecs
+from celery.app import defaults
+from celery.bin.base import Command
+from celery.utils.functional import pass1
+
+
+class upgrade(Command):
+    """Perform upgrade between versions."""
+
+    choices = {'settings'}
+
+    def add_arguments(self, parser):
+        group = parser.add_argument_group('Upgrading Options')
+        group.add_argument(
+            '--django', action='store_true', default=False,
+            help='Upgrade Django project',
+        )
+        group.add_argument(
+            '--compat', action='store_true', default=False,
+            help='Maintain backwards compatibility',
+        )
+        group.add_argument(
+            '--no-backup', action='store_true', default=False,
+            help='Dont backup original files',
+        )
+
+    def usage(self, command):
+        return '%(prog)s <command> settings [filename] [options]'
+
+    def run(self, *args, **kwargs):
+        try:
+            command = args[0]
+        except IndexError:
+            raise self.UsageError(
+                'missing upgrade type: try `celery upgrade settings` ?')
+        if command not in self.choices:
+            raise self.UsageError('unknown upgrade type: {0}'.format(command))
+        return getattr(self, command)(*args, **kwargs)
+
+    def settings(self, command, filename,
+                 no_backup=False, django=False, compat=False, **kwargs):
+        lines = self._slurp(filename)
+        keyfilter = self._compat_key if django or compat else pass1
+        print('processing {0}...'.format(filename), file=self.stderr)
+        # gives list of tuples: ``(did_change, line_contents)``
+        new_lines = [
+            self._to_new_key(line, keyfilter) for line in lines
+        ]
+        if any(n[0] for n in new_lines):  # did have changes
+            if not no_backup:
+                self._backup(filename)
+            with codecs.open(filename, 'w', 'utf-8') as write_fh:
+                for _, line in new_lines:
+                    write_fh.write(line)
+            print('Changes to your setting have been made!',
+                  file=self.stdout)
+        else:
+            print('Does not seem to require any changes :-)',
+                  file=self.stdout)
+
+    def _slurp(self, filename):
+        with codecs.open(filename, 'r', 'utf-8') as read_fh:
+            return [line for line in read_fh]
+
+    def _backup(self, filename, suffix='.orig'):
+        lines = []
+        backup_filename = ''.join([filename, suffix])
+        print('writing backup to {0}...'.format(backup_filename),
+              file=self.stderr)
+        with codecs.open(filename, 'r', 'utf-8') as read_fh:
+            with codecs.open(backup_filename, 'w', 'utf-8') as backup_fh:
+                for line in read_fh:
+                    backup_fh.write(line)
+                    lines.append(line)
+        return lines
+
+    def _to_new_key(self, line, keyfilter=pass1, source=defaults._TO_NEW_KEY):
+        # sort by length to avoid, for example, broker_transport overriding
+        # broker_transport_options.
+        for old_key in reversed(sorted(source, key=lambda x: len(x))):
+            new_line = line.replace(old_key, keyfilter(source[old_key]))
+            if line != new_line and 'CELERY_CELERY' not in new_line:
+                return 1, new_line  # only one match per line.
+        return 0, line
+
+    def _compat_key(self, key, namespace='CELERY'):
+        key = key.upper()
+        if not key.startswith(namespace):
+            key = '_'.join([namespace, key])
+        return key

+ 40 - 0
t/unit/bin/test_call.py

@@ -0,0 +1,40 @@
+from __future__ import absolute_import, unicode_literals
+import pytest
+from datetime import datetime
+from case import patch
+from kombu.utils.json import dumps
+from celery.five import WhateverIO
+from celery.bin.call import call
+
+
+class test_call:
+
+    def setup(self):
+
+        @self.app.task(shared=False)
+        def add(x, y):
+            return x + y
+        self.add = add
+
+    @patch('celery.app.base.Celery.send_task')
+    def test_run(self, send_task):
+        a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
+        a.run(self.add.name)
+        send_task.assert_called()
+
+        a.run(self.add.name,
+              args=dumps([4, 4]),
+              kwargs=dumps({'x': 2, 'y': 2}))
+        assert send_task.call_args[1]['args'], [4 == 4]
+        assert send_task.call_args[1]['kwargs'] == {'x': 2, 'y': 2}
+
+        a.run(self.add.name, expires=10, countdown=10)
+        assert send_task.call_args[1]['expires'] == 10
+        assert send_task.call_args[1]['countdown'] == 10
+
+        now = datetime.now()
+        iso = now.isoformat()
+        a.run(self.add.name, expires=iso)
+        assert send_task.call_args[1]['expires'] == now
+        with pytest.raises(ValueError):
+            a.run(self.add.name, expires='foobaribazibar')

+ 0 - 252
t/unit/bin/test_celery.py

@@ -3,31 +3,19 @@ from __future__ import absolute_import, unicode_literals
 import pytest
 import sys
 
-from datetime import datetime
-
 from case import Mock, patch
-from kombu.utils.json import dumps
 
 from celery import __main__
 from celery.bin.base import Error
 from celery.bin import celery as mod
 from celery.bin.celery import (
     Command,
-    list_,
-    call,
-    purge,
-    result,
-    inspect,
-    control,
-    status,
-    migrate,
     help,
     report,
     CeleryCommand,
     determine_exit_status,
     multi,
     main as mainfun,
-    _RemoteControl,
 )
 from celery.five import WhateverIO
 from celery.platforms import EX_FAILURE, EX_USAGE, EX_OK
@@ -107,144 +95,6 @@ class test_Command:
         assert 'OK' in str(self.cmd.pretty({'foo': 'bar'}))
 
 
-class test_list:
-
-    def test_list_bindings_no_support(self):
-        l = list_(app=self.app, stderr=WhateverIO())
-        management = Mock()
-        management.get_bindings.side_effect = NotImplementedError()
-        with pytest.raises(Error):
-            l.list_bindings(management)
-
-    def test_run(self):
-        l = list_(app=self.app, stderr=WhateverIO())
-        l.run('bindings')
-
-        with pytest.raises(Error):
-            l.run(None)
-
-        with pytest.raises(Error):
-            l.run('foo')
-
-
-class test_call:
-
-    def setup(self):
-
-        @self.app.task(shared=False)
-        def add(x, y):
-            return x + y
-        self.add = add
-
-    @patch('celery.app.base.Celery.send_task')
-    def test_run(self, send_task):
-        a = call(app=self.app, stderr=WhateverIO(), stdout=WhateverIO())
-        a.run(self.add.name)
-        send_task.assert_called()
-
-        a.run(self.add.name,
-              args=dumps([4, 4]),
-              kwargs=dumps({'x': 2, 'y': 2}))
-        assert send_task.call_args[1]['args'], [4 == 4]
-        assert send_task.call_args[1]['kwargs'] == {'x': 2, 'y': 2}
-
-        a.run(self.add.name, expires=10, countdown=10)
-        assert send_task.call_args[1]['expires'] == 10
-        assert send_task.call_args[1]['countdown'] == 10
-
-        now = datetime.now()
-        iso = now.isoformat()
-        a.run(self.add.name, expires=iso)
-        assert send_task.call_args[1]['expires'] == now
-        with pytest.raises(ValueError):
-            a.run(self.add.name, expires='foobaribazibar')
-
-
-class test_purge:
-
-    def test_run(self):
-        out = WhateverIO()
-        a = purge(app=self.app, stdout=out)
-        a._purge = Mock(name='_purge')
-        a._purge.return_value = 0
-        a.run(force=True)
-        assert 'No messages purged' in out.getvalue()
-
-        a._purge.return_value = 100
-        a.run(force=True)
-        assert '100 messages' in out.getvalue()
-
-        a.out = Mock(name='out')
-        a.ask = Mock(name='ask')
-        a.run(force=False)
-        a.ask.assert_called_with(a.warn_prompt, ('yes', 'no'), 'no')
-        a.ask.return_value = 'yes'
-        a.run(force=False)
-
-
-class test_result:
-
-    def setup(self):
-
-        @self.app.task(shared=False)
-        def add(x, y):
-            return x + y
-        self.add = add
-
-    def test_run(self):
-        with patch('celery.result.AsyncResult.get') as get:
-            out = WhateverIO()
-            r = result(app=self.app, stdout=out)
-            get.return_value = 'Jerry'
-            r.run('id')
-            assert 'Jerry' in out.getvalue()
-
-            get.return_value = 'Elaine'
-            r.run('id', task=self.add.name)
-            assert 'Elaine' in out.getvalue()
-
-            with patch('celery.result.AsyncResult.traceback') as tb:
-                r.run('id', task=self.add.name, traceback=True)
-                assert str(tb) in out.getvalue()
-
-
-class test_status:
-
-    @patch('celery.bin.celery.inspect')
-    def test_run(self, inspect_):
-        out, err = WhateverIO(), WhateverIO()
-        ins = inspect_.return_value = Mock()
-        ins.run.return_value = []
-        s = status(self.app, stdout=out, stderr=err)
-        with pytest.raises(Error):
-            s.run()
-
-        ins.run.return_value = ['a', 'b', 'c']
-        s.run()
-        assert '3 nodes online' in out.getvalue()
-        s.run(quiet=True)
-
-
-class test_migrate:
-
-    @patch('celery.contrib.migrate.migrate_tasks')
-    def test_run(self, migrate_tasks):
-        out = WhateverIO()
-        m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
-        with pytest.raises(TypeError):
-            m.run()
-        migrate_tasks.assert_not_called()
-
-        m.run('memory://foo', 'memory://bar')
-        migrate_tasks.assert_called()
-
-        state = Mock()
-        state.count = 10
-        state.strtotal = 30
-        m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
-        assert '10/30' in out.getvalue()
-
-
 class test_report:
 
     def test_run(self):
@@ -406,108 +256,6 @@ class test_CeleryCommand:
             assert x.prepare_prog_name('celery') == 'celery'
 
 
-class test_RemoteControl:
-
-    def test_call_interface(self):
-        with pytest.raises(NotImplementedError):
-            _RemoteControl(app=self.app).call()
-
-
-class test_inspect:
-
-    def test_usage(self):
-        assert inspect(app=self.app).usage('foo')
-
-    def test_command_info(self):
-        i = inspect(app=self.app)
-        assert i.get_command_info(
-            'ping', help=True, color=i.colored.red, app=self.app,
-        )
-
-    def test_list_commands_color(self):
-        i = inspect(app=self.app)
-        assert i.list_commands(help=True, color=i.colored.red, app=self.app)
-        assert i.list_commands(help=False, color=None, app=self.app)
-
-    def test_epilog(self):
-        assert inspect(app=self.app).epilog
-
-    def test_do_call_method_sql_transport_type(self):
-        self.app.connection = Mock()
-        conn = self.app.connection.return_value = Mock(name='Connection')
-        conn.transport.driver_type = 'sql'
-        i = inspect(app=self.app)
-        with pytest.raises(i.Error):
-            i.do_call_method(['ping'])
-
-    def test_say_directions(self):
-        i = inspect(self.app)
-        i.out = Mock()
-        i.quiet = True
-        i.say_chat('<-', 'hello out')
-        i.out.assert_not_called()
-
-        i.say_chat('->', 'hello in')
-        i.out.assert_called()
-
-        i.quiet = False
-        i.out.reset_mock()
-        i.say_chat('<-', 'hello out', 'body')
-        i.out.assert_called()
-
-    @patch('celery.app.control.Control.inspect')
-    def test_run(self, real):
-        out = WhateverIO()
-        i = inspect(app=self.app, stdout=out)
-        with pytest.raises(Error):
-            i.run()
-        with pytest.raises(Error):
-            i.run('help')
-        with pytest.raises(Error):
-            i.run('xyzzybaz')
-
-        i.run('ping')
-        real.assert_called()
-        i.run('ping', destination='foo,bar')
-        assert real.call_args[1]['destination'], ['foo' == 'bar']
-        assert real.call_args[1]['timeout'] == 0.2
-        callback = real.call_args[1]['callback']
-
-        callback({'foo': {'ok': 'pong'}})
-        assert 'OK' in out.getvalue()
-
-        with patch('celery.bin.celery.dumps') as dumps:
-            i.run('ping', json=True)
-            dumps.assert_called()
-
-        instance = real.return_value = Mock()
-        instance._request.return_value = None
-        with pytest.raises(Error):
-            i.run('ping')
-
-        out.seek(0)
-        out.truncate()
-        i.quiet = True
-        i.say_chat('<-', 'hello')
-        assert not out.getvalue()
-
-
-class test_control:
-
-    def control(self, patch_call, *args, **kwargs):
-        kwargs.setdefault('app', Mock(name='app'))
-        c = control(*args, **kwargs)
-        if patch_call:
-            c.call = Mock(name='control.call')
-        return c
-
-    def test_call(self):
-        i = self.control(False)
-        i.call('foo', arguments={'kw': 2})
-        i.app.control.broadcast.assert_called_with(
-            'foo', arguments={'kw': 2}, reply=True)
-
-
 class test_multi:
 
     def test_get_options(self):

+ 125 - 0
t/unit/bin/test_control.py

@@ -0,0 +1,125 @@
+from __future__ import absolute_import, unicode_literals
+import pytest
+from case import Mock, patch
+from celery.five import WhateverIO
+from celery.bin.base import Error
+from celery.bin.control import _RemoteControl, inspect, control, status
+
+
+class test_RemoteControl:
+
+    def test_call_interface(self):
+        with pytest.raises(NotImplementedError):
+            _RemoteControl(app=self.app).call()
+
+
+class test_inspect:
+
+    def test_usage(self):
+        assert inspect(app=self.app).usage('foo')
+
+    def test_command_info(self):
+        i = inspect(app=self.app)
+        assert i.get_command_info(
+            'ping', help=True, color=i.colored.red, app=self.app,
+        )
+
+    def test_list_commands_color(self):
+        i = inspect(app=self.app)
+        assert i.list_commands(help=True, color=i.colored.red, app=self.app)
+        assert i.list_commands(help=False, color=None, app=self.app)
+
+    def test_epilog(self):
+        assert inspect(app=self.app).epilog
+
+    def test_do_call_method_sql_transport_type(self):
+        self.app.connection = Mock()
+        conn = self.app.connection.return_value = Mock(name='Connection')
+        conn.transport.driver_type = 'sql'
+        i = inspect(app=self.app)
+        with pytest.raises(i.Error):
+            i.do_call_method(['ping'])
+
+    def test_say_directions(self):
+        i = inspect(self.app)
+        i.out = Mock()
+        i.quiet = True
+        i.say_chat('<-', 'hello out')
+        i.out.assert_not_called()
+
+        i.say_chat('->', 'hello in')
+        i.out.assert_called()
+
+        i.quiet = False
+        i.out.reset_mock()
+        i.say_chat('<-', 'hello out', 'body')
+        i.out.assert_called()
+
+    @patch('celery.app.control.Control.inspect')
+    def test_run(self, real):
+        out = WhateverIO()
+        i = inspect(app=self.app, stdout=out)
+        with pytest.raises(Error):
+            i.run()
+        with pytest.raises(Error):
+            i.run('help')
+        with pytest.raises(Error):
+            i.run('xyzzybaz')
+
+        i.run('ping')
+        real.assert_called()
+        i.run('ping', destination='foo,bar')
+        assert real.call_args[1]['destination'], ['foo' == 'bar']
+        assert real.call_args[1]['timeout'] == 0.2
+        callback = real.call_args[1]['callback']
+
+        callback({'foo': {'ok': 'pong'}})
+        assert 'OK' in out.getvalue()
+
+        with patch('celery.bin.control.dumps') as dumps:
+            i.run('ping', json=True)
+            dumps.assert_called()
+
+        instance = real.return_value = Mock()
+        instance._request.return_value = None
+        with pytest.raises(Error):
+            i.run('ping')
+
+        out.seek(0)
+        out.truncate()
+        i.quiet = True
+        i.say_chat('<-', 'hello')
+        assert not out.getvalue()
+
+
+class test_control:
+
+    def control(self, patch_call, *args, **kwargs):
+        kwargs.setdefault('app', Mock(name='app'))
+        c = control(*args, **kwargs)
+        if patch_call:
+            c.call = Mock(name='control.call')
+        return c
+
+    def test_call(self):
+        i = self.control(False)
+        i.call('foo', arguments={'kw': 2})
+        i.app.control.broadcast.assert_called_with(
+            'foo', arguments={'kw': 2}, reply=True)
+
+
+class test_status:
+
+    @patch('celery.bin.control.inspect')
+    def test_run(self, inspect_):
+        out, err = WhateverIO(), WhateverIO()
+        ins = inspect_.return_value = Mock()
+        ins.run.return_value = []
+        s = status(self.app, stdout=out, stderr=err)
+        with pytest.raises(Error):
+            s.run()
+
+        ins.run.return_value = ['a', 'b', 'c']
+        s.run()
+        assert '3 nodes online' in out.getvalue()
+        s.run(quiet=True)

+ 26 - 0
t/unit/bin/test_list.py

@@ -0,0 +1,26 @@
+from __future__ import absolute_import, unicode_literals
+import pytest
+from case import Mock
+from kombu.five import WhateverIO
+from celery.bin.base import Error
+from celery.bin.list import list_
+
+
+class test_list:
+
+    def test_list_bindings_no_support(self):
+        l = list_(app=self.app, stderr=WhateverIO())
+        management = Mock()
+        management.get_bindings.side_effect = NotImplementedError()
+        with pytest.raises(Error):
+            l.list_bindings(management)
+
+    def test_run(self):
+        l = list_(app=self.app, stderr=WhateverIO())
+        l.run('bindings')
+
+        with pytest.raises(Error):
+            l.run(None)
+
+        with pytest.raises(Error):
+            l.run('foo')

+ 25 - 0
t/unit/bin/test_migrate.py

@@ -0,0 +1,25 @@
+from __future__ import absolute_import, unicode_literals
+import pytest
+from case import Mock, patch
+from celery.five import WhateverIO
+from celery.bin.migrate import migrate
+
+
+class test_migrate:
+
+    @patch('celery.contrib.migrate.migrate_tasks')
+    def test_run(self, migrate_tasks):
+        out = WhateverIO()
+        m = migrate(app=self.app, stdout=out, stderr=WhateverIO())
+        with pytest.raises(TypeError):
+            m.run()
+        migrate_tasks.assert_not_called()
+
+        m.run('memory://foo', 'memory://bar')
+        migrate_tasks.assert_called()
+
+        state = Mock()
+        state.count = 10
+        state.strtotal = 30
+        m.on_migrate_task(state, {'task': 'tasks.add', 'id': 'ID'}, None)
+        assert '10/30' in out.getvalue()

+ 26 - 0
t/unit/bin/test_purge.py

@@ -0,0 +1,26 @@
+from __future__ import absolute_import, unicode_literals
+from case import Mock
+from celery.five import WhateverIO
+from celery.bin.purge import purge
+
+
+class test_purge:
+
+    def test_run(self):
+        out = WhateverIO()
+        a = purge(app=self.app, stdout=out)
+        a._purge = Mock(name='_purge')
+        a._purge.return_value = 0
+        a.run(force=True)
+        assert 'No messages purged' in out.getvalue()
+
+        a._purge.return_value = 100
+        a.run(force=True)
+        assert '100 messages' in out.getvalue()
+
+        a.out = Mock(name='out')
+        a.ask = Mock(name='ask')
+        a.run(force=False)
+        a.ask.assert_called_with(a.warn_prompt, ('yes', 'no'), 'no')
+        a.ask.return_value = 'yes'
+        a.run(force=False)

+ 30 - 0
t/unit/bin/test_result.py

@@ -0,0 +1,30 @@
+from __future__ import absolute_import, unicode_literals
+from case import patch
+from celery.five import WhateverIO
+from celery.bin.result import result
+
+
+class test_result:
+
+    def setup(self):
+
+        @self.app.task(shared=False)
+        def add(x, y):
+            return x + y
+        self.add = add
+
+    def test_run(self):
+        with patch('celery.result.AsyncResult.get') as get:
+            out = WhateverIO()
+            r = result(app=self.app, stdout=out)
+            get.return_value = 'Jerry'
+            r.run('id')
+            assert 'Jerry' in out.getvalue()
+
+            get.return_value = 'Elaine'
+            r.run('id', task=self.add.name)
+            assert 'Elaine' in out.getvalue()
+
+            with patch('celery.result.AsyncResult.traceback') as tb:
+                r.run('id', task=self.add.name, traceback=True)
+                assert str(tb) in out.getvalue()