@@ -6,14 +6,14 @@ The :program:`celery` umbrella command.
.. program:: celery
-from __future__ import absolute_import
-from __future__ import with_statement
+from __future__ import absolute_import, print_function
import anyjson
import sys
import warnings
from billiard import freeze_support
+from future_builtins import map
from importlib import import_module
from pprint import pformat
@@ -29,10 +29,15 @@ from celery.bin.base import Command as BaseCommand, Option
HELP = """
---- -- - - ---- Commands- -------------- --- ------------
---- -- - - --------- -- - -------------- --- ------------
-Type '%(prog_name)s <command> --help' for help using a specific command.
+Type '{prog_name} <command> --help' for help using a specific command.
+Migrating task {state.count}/{state.strtotal}: \
commands = {}
@@ -62,10 +67,13 @@ class Error(Exception):
return self.reason
-def command(fun, name=None, sortpri=0):
- commands[name or fun.__name__] = fun
- fun.sortpri = sortpri
- return fun
+def command(*args, **kwargs):
+ def _register(fun):
+ commands[kwargs.get('name') or fun.__name__] = fun
+ return fun
+ return _register(args[0]) if args else _register
def load_extension_commands(namespace='celery.commands'):
@@ -79,11 +87,13 @@ def load_extension_commands(namespace='celery.commands'):
sym = ':'.join([ep.module_name, ep.attrs[0]])
cls = symbol_by_name(sym)
- except (ImportError, SyntaxError), exc:
- warnings.warn('Cannot load extension %r: %r' % (sym, exc))
+ except (ImportError, SyntaxError) as exc:
+ warnings.warn(
+ 'Cannot load extension {0!r}: {1!r}'.format(sym, exc))
command(cls, name=ep.name)
class Command(BaseCommand):
help = ''
args = ''
@@ -109,8 +119,8 @@ class Command(BaseCommand):
def __call__(self, *args, **kwargs):
ret = self.run(*args, **kwargs)
- except Error, exc:
- self.error(self.colored.red('Error: %s' % exc))
+ except Error as exc:
+ self.error(self.colored.red('Error: {0!r}'.format(exc)))
return exc.status
return ret if ret is not None else EX_OK
@@ -123,10 +133,7 @@ class Command(BaseCommand):
self.out(s, fh=self.stderr)
def out(self, s, fh=None):
- s = str(s)
- if not s.endswith('\n'):
- s += '\n'
- (fh or self.stdout).write(s)
+ print(s, file=fh or self.stdout)
def run_from_argv(self, prog_name, argv):
self.prog_name = prog_name
@@ -141,13 +148,13 @@ class Command(BaseCommand):
return self(*args, **options)
def usage(self, command):
- return '%%prog %s [options] %s' % (command, self.args)
+ return '%%prog {0} [options] {self.args}'.format(command, self=self)
def prettify_list(self, n):
c = self.colored
if not n:
return '- empty -'
- return '\n'.join(str(c.reset(c.white('*'), ' %s' % (item, )))
+ return '\n'.join(str(c.reset(c.white('*'), ' {0}'.format(item)))
for item in n)
def prettify_dict_ok_error(self, n):
@@ -213,6 +220,7 @@ class Delegate(Command):
return self.target.run(*args, **kwargs)
class multi(Command):
"""Start multiple worker instances."""
@@ -222,9 +230,9 @@ class multi(Command):
def run_from_argv(self, prog_name, argv):
from celery.bin.celeryd_multi import MultiTool
return MultiTool().execute_from_commandline(argv, prog_name)
-multi = command(multi)
class worker(Delegate):
"""Start worker instance.
@@ -239,9 +247,9 @@ class worker(Delegate):
celery worker --autoscale=10,0
Command = 'celery.bin.celeryd:WorkerCommand'
-worker = command(worker, sortpri=01)
class events(Delegate):
"""Event-stream utilities.
@@ -262,9 +270,9 @@ class events(Delegate):
celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
Command = 'celery.bin.celeryev:EvCommand'
-events = command(events, sortpri=10)
class beat(Delegate):
"""Start the celerybeat periodic task scheduler.
@@ -276,9 +284,9 @@ class beat(Delegate):
Command = 'celery.bin.celerybeat:BeatCommand'
-beat = command(beat, sortpri=20)
class amqp(Delegate):
"""AMQP Administration Shell.
@@ -297,9 +305,9 @@ class amqp(Delegate):
Command = 'celery.bin.camqadm:AMQPAdminCommand'
-amqp = command(amqp, sortpri=30)
class list_(Command):
"""Get info from broker.
@@ -317,8 +325,7 @@ class list_(Command):
except NotImplementedError:
raise Error('Your transport cannot list bindings.')
- fmt = lambda q, e, r: self.out('%s %s %s' % (q.ljust(28),
- e.ljust(28), r))
+ fmt = lambda q, e, r: self.out('{0:<28} {1:<28} {2}'.format(q, e, r))
fmt('Queue', 'Exchange', 'Routing Key')
fmt('-' * 16, '-' * 16, '-' * 16)
for b in bindings:
@@ -328,16 +335,16 @@ class list_(Command):
topics = {'bindings': self.list_bindings}
available = ', '.join(topics.keys())
if not what:
- raise Error('You must specify what to list (%s)' % available)
+ raise Error('You must specify one of {0}'.format(available))
if what not in topics:
- raise Error('unknown topic %r (choose one of: %s)' % (
+ raise Error('unknown topic {0!r} (choose one of: {1})'.format(
what, available))
with self.app.connection() as conn:
-list_ = command(list_, 'list')
class call(Command):
"""Call a task by name.
@@ -391,28 +398,29 @@ class call(Command):
-call = command(call)
class purge(Command):
"""Erase all messages from all known task queues.
WARNING: There is no undo operation for this command.
+ fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
+ fmt_empty = 'No messages purged from {qnum} {queues}'
def run(self, *args, **kwargs):
queues = len(self.app.amqp.queues.keys())
- messages_removed = self.app.control.purge()
- if messages_removed:
- self.out('Purged %s %s from %s known task %s.' % (
- messages_removed, text.pluralize(messages_removed, 'message'),
- queues, text.pluralize(queues, 'queue')))
- else:
- self.out('No messages purged from %s known %s' % (
- queues, text.pluralize(queues, 'queue')))
-purge = command(purge)
+ messages = self.app.control.purge()
+ fmt = self.fmt_purged if messages else self.fmt_empty
+ self.out(fmt.format(
+ mnum=messages, qnum=queues,
+ messages=text.pluralize(messages, 'message'),
+ queues=text.pluralize(queues, 'queue')))
class result(Command):
"""Gives the return value for a given task id.
@@ -443,7 +451,6 @@ class result(Command):
value = result.get()
-result = command(result)
class _RemoteControl(Command):
@@ -467,8 +474,8 @@ class _RemoteControl(Command):
# see if it uses args.
meth = getattr(self, command)
return text.join([
- '|' + text.indent('%s%s %s' % (prefix, color(command),
- meth.__doc__), indent), help,
+ '|' + text.indent('{0}{1} {2}'.format(prefix, color(command),
+ meth.__doc__), indent), help,
except AttributeError:
@@ -491,7 +498,7 @@ class _RemoteControl(Command):
def usage(self, command):
- return '%%prog %s [options] %s <command> [arg1 .. argN]' % (
+ return '%%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
command, self.args)
def call(self, *args, **kwargs):
@@ -499,30 +506,29 @@ class _RemoteControl(Command):
def run(self, *args, **kwargs):
if not args:
- raise Error('Missing %s method. See --help' % self.name)
+ raise Error('Missing {0.name} method. See --help'.format(self))
return self.do_call_method(args, **kwargs)
def do_call_method(self, args, **kwargs):
method = args[0]
if method == 'help':
- raise Error("Did you mean '%s --help'?" % self.name)
+ raise Error("Did you mean '{0.name} --help'?".format(self))
if method not in self.choices:
- raise Error('Unknown %s method %s' % (self.name, method))
+ raise Error('Unknown {0.name} method {1}'.format(self, method))
destination = kwargs.get('destination')
timeout = kwargs.get('timeout') or self.choices[method][0]
if destination and isinstance(destination, basestring):
- destination = map(str.strip, destination.split(','))
+ destination = list(map(str.strip, destination.split(',')))
handler = getattr(self, method)
except AttributeError:
handler = self.call
- # XXX Python 2.5 does not support X(*args, foo=1)
- kwargs = {"timeout": timeout, "destination": destination,
- "callback": self.say_remote_command_reply}
- replies = handler(method, *args[1:], **kwargs)
+ replies = handler(method, *args[1:], timeout=timeout,
+ destination=destination,
+ callback=self.say_remote_command_reply)
if not replies:
raise Error('No nodes replied within time constraint.',
@@ -538,6 +544,7 @@ class _RemoteControl(Command):
class inspect(_RemoteControl):
"""Inspect the worker at runtime.
@@ -566,9 +573,9 @@ class inspect(_RemoteControl):
def call(self, method, *args, **options):
i = self.app.control.inspect(**options)
return getattr(i, method)(*args)
-inspect = command(inspect)
class control(_RemoteControl):
"""Workers remote control.
@@ -602,9 +609,7 @@ class control(_RemoteControl):
def call(self, method, *args, **options):
- # XXX Python 2.5 doesn't support X(*args, reply=True, **kwargs)
- return getattr(self.app.control, method)(
- *args, **dict(options, retry=True))
+ return getattr(self.app.control, method)(*args, retry=True, **options)
def pool_grow(self, method, n=1, **kwargs):
@@ -635,9 +640,9 @@ class control(_RemoteControl):
def cancel_consumer(self, method, queue, **kwargs):
return self.call(method, queue, reply=True, **kwargs)
-control = command(control)
class status(Command):
"""Show list of workers that are online."""
option_list = inspect.option_list
@@ -653,11 +658,11 @@ class status(Command):
nodecount = len(replies)
if not kwargs.get('quiet', False):
- self.out('\n%s %s online.' % (nodecount,
- text.pluralize(nodecount, 'node')))
-status = command(status)
+ self.out('\n{0} {1} online.'.format(
+ nodecount, text.pluralize(nodecount, 'node')))
class migrate(Command):
"""Migrate tasks from one broker to another.
@@ -684,10 +689,10 @@ class migrate(Command):
Option('--forever', '-F', action='store_true',
help='Continually migrate tasks until killed.'),
+ progress_fmt = MIGRATE_PROGRESS_FMT
def on_migrate_task(self, state, body, message):
- self.out('Migrating task %s/%s: %s[%s]' % (
- state.count, state.strtotal, body['task'], body['id']))
+ self.out(self.progress_fmt.format(state=state, body=body))
def run(self, *args, **kwargs):
if len(args) != 2:
@@ -699,9 +704,9 @@ class migrate(Command):
-migrate = command(migrate)
class shell(Command): # pragma: no cover
"""Start shell session with convenient access to celery symbols.
@@ -811,31 +816,29 @@ class shell(Command): # pragma: no cover
import bpython
-shell = command(shell)
class help(Command):
"""Show help screen and exit."""
def usage(self, command):
- return '%%prog <command> [options] %s' % (self.args, )
+ return '%%prog <command> [options] {0.args}'.format(self)
def run(self, *args, **kwargs):
- self.out(HELP % {'prog_name': self.prog_name,
- 'commands': CeleryCommand.list_commands()})
+ self.out(HELP.format(prog_name=self.prog_name,
+ commands=CeleryCommand.list_commands()))
return EX_USAGE
-help = command(help)
class report(Command):
"""Shows information useful to include in bugreports."""
def run(self, *args, **kwargs):
return EX_OK
-report = command(report)
class CeleryCommand(BaseCommand):
@@ -888,12 +891,13 @@ class CeleryCommand(BaseCommand):
def get_command_info(self, command, indent=0, color=None):
colored = term.colored().names[color] if color else lambda x: x
obj = self.commands[command]
+ cmd = 'celery {0}'.format(colored(command))
if obj.leaf:
- return '|' + text.indent('celery %s' % colored(command), indent)
+ return '|' + text.indent(cmd, indent)
return text.join([
' ',
- '|' + text.indent('celery %s --help' % colored(command), indent),
- obj.list_commands(indent, 'celery %s' % command, colored),
+ '|' + text.indent('{0} --help'.format(cmd), indent),
+ obj.list_commands(indent, 'celery {0}'.format(command), colored),
@@ -902,7 +906,7 @@ class CeleryCommand(BaseCommand):
ret = []
for cls, commands, color in command_classes:
- text.indent('+ %s: ' % white(cls), indent),
+ text.indent('+ {0}: '.format(white(cls)), indent),
'\n'.join(self.get_command_info(command, indent + 4, color)
for command in commands),