|
@@ -121,7 +121,7 @@ class list_(Command):
|
|
|
raise Error('You must specify one of {0}'.format(available))
|
|
|
if what not in topics:
|
|
|
raise Error('unknown topic {0!r} (choose one of: {1})'.format(
|
|
|
- what, available))
|
|
|
+ what, available))
|
|
|
with self.app.connection() as conn:
|
|
|
self.app.amqp.TaskConsumer(conn).declare()
|
|
|
topics[what](conn.manager)
|
|
@@ -137,16 +137,16 @@ class call(Command):
|
|
|
"""
|
|
|
args = '<task_name>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--args', '-a', help='positional arguments (json).'),
|
|
|
- Option('--kwargs', '-k', help='keyword arguments (json).'),
|
|
|
- Option('--eta', help='scheduled time (ISO-8601).'),
|
|
|
- Option('--countdown', type='float',
|
|
|
- help='eta in seconds from now (float/int).'),
|
|
|
- Option('--expires', help='expiry time (ISO-8601/float/int).'),
|
|
|
- Option('--serializer', default='json', help='defaults to json.'),
|
|
|
- Option('--queue', help='custom queue name.'),
|
|
|
- Option('--exchange', help='custom exchange name.'),
|
|
|
- Option('--routing-key', help='custom routing key.'),
|
|
|
+ Option('--args', '-a', help='positional arguments (json).'),
|
|
|
+ Option('--kwargs', '-k', help='keyword arguments (json).'),
|
|
|
+ Option('--eta', help='scheduled time (ISO-8601).'),
|
|
|
+ Option('--countdown', type='float',
|
|
|
+ help='eta in seconds from now (float/int).'),
|
|
|
+ Option('--expires', help='expiry time (ISO-8601/float/int).'),
|
|
|
+ Option('--serializer', default='json', help='defaults to json.'),
|
|
|
+ Option('--queue', help='custom queue name.'),
|
|
|
+ Option('--exchange', help='custom exchange name.'),
|
|
|
+ Option('--routing-key', help='custom routing key.'),
|
|
|
)
|
|
|
|
|
|
def run(self, name, *_, **kw):
|
|
@@ -213,9 +213,9 @@ class result(Command):
|
|
|
"""
|
|
|
args = '<task_id>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--task', '-t', help='name of task (if custom backend)'),
|
|
|
- Option('--traceback', action='store_true',
|
|
|
- help='show traceback instead'),
|
|
|
+ Option('--task', '-t', help='name of task (if custom backend)'),
|
|
|
+ Option('--traceback', action='store_true',
|
|
|
+ help='show traceback instead'),
|
|
|
)
|
|
|
|
|
|
def run(self, task_id, *args, **kwargs):
|
|
@@ -238,10 +238,10 @@ class _RemoteControl(Command):
|
|
|
choices = None
|
|
|
leaf = False
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--timeout', '-t', type='float',
|
|
|
- help='Timeout in seconds (float) waiting for reply'),
|
|
|
- Option('--destination', '-d',
|
|
|
- help='Comma separated list of destination node names.'))
|
|
|
+ Option('--timeout', '-t', type='float',
|
|
|
+ help='Timeout in seconds (float) waiting for reply'),
|
|
|
+ Option('--destination', '-d',
|
|
|
+ help='Comma separated list of destination node names.'))
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
self.show_body = kwargs.pop('show_body', True)
|
|
@@ -249,8 +249,8 @@ class _RemoteControl(Command):
|
|
|
super(_RemoteControl, self).__init__(*args, **kwargs)
|
|
|
|
|
|
@classmethod
|
|
|
- def get_command_info(self, command, indent=0, prefix='', color=None,
|
|
|
- help=False):
|
|
|
+ def get_command_info(self, command,
|
|
|
+ indent=0, prefix='', color=None, help=False):
|
|
|
if help:
|
|
|
help = '|' + text.indent(self.choices[command][1], indent + 4)
|
|
|
else:
|
|
@@ -259,8 +259,9 @@ class _RemoteControl(Command):
|
|
|
|
|
|
meth = getattr(self, command)
|
|
|
return text.join([
|
|
|
- '|' + text.indent('{0}{1} {2}'.format(prefix, color(command),
|
|
|
- meth.__doc__), indent), help,
|
|
|
+ '|' + text.indent('{0}{1} {2}'.format(
|
|
|
+ prefix, color(command), meth.__doc__), indent),
|
|
|
+ help,
|
|
|
])
|
|
|
|
|
|
except AttributeError:
|
|
@@ -273,7 +274,7 @@ class _RemoteControl(Command):
|
|
|
color = color if color else lambda x: x
|
|
|
prefix = prefix + ' ' if prefix else ''
|
|
|
return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
|
|
|
- for c in sorted(self.choices))
|
|
|
+ for c in sorted(self.choices))
|
|
|
|
|
|
@property
|
|
|
def epilog(self):
|
|
@@ -284,7 +285,7 @@ class _RemoteControl(Command):
|
|
|
|
|
|
def usage(self, command):
|
|
|
return '%prog {0} [options] {1} <command> [arg1 .. argN]'.format(
|
|
|
- command, self.args)
|
|
|
+ command, self.args)
|
|
|
|
|
|
def call(self, *args, **kwargs):
|
|
|
raise NotImplementedError('get_obj')
|
|
@@ -384,10 +385,10 @@ class control(_RemoteControl):
|
|
|
'disable_events': (1.0, 'tell worker(s) to disable events'),
|
|
|
'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
|
|
|
'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
|
|
|
- 'rate_limit': (1.0,
|
|
|
- 'tell worker(s) to modify the rate limit for a task type'),
|
|
|
- 'time_limit': (1.0,
|
|
|
- 'tell worker(s) to modify the time limit for a task type.'),
|
|
|
+ 'rate_limit': (
|
|
|
+ 1.0, 'tell worker(s) to modify the rate limit for a task type'),
|
|
|
+ 'time_limit': (
|
|
|
+ 1.0, 'tell worker(s) to modify the time limit for a task type.'),
|
|
|
'autoscale': (1.0, 'change autoscale settings'),
|
|
|
'pool_grow': (1.0, 'start more pool processes'),
|
|
|
'pool_shrink': (1.0, 'use less pool processes'),
|
|
@@ -417,7 +418,7 @@ class control(_RemoteControl):
|
|
|
return self.call(method, task_name, soft, hard, reply=True, **kwargs)
|
|
|
|
|
|
def add_consumer(self, method, queue, exchange=None,
|
|
|
- exchange_type='direct', routing_key=None, **kwargs):
|
|
|
+ exchange_type='direct', routing_key=None, **kwargs):
|
|
|
"""<queue> [exchange [type [routing_key]]]"""
|
|
|
return self.call(method, queue, exchange,
|
|
|
exchange_type, routing_key, reply=True, **kwargs)
|
|
@@ -432,10 +433,12 @@ class status(Command):
|
|
|
option_list = inspect.option_list
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
- I = inspect(app=self.app,
|
|
|
- no_color=kwargs.get('no_color', False),
|
|
|
- stdout=self.stdout, stderr=self.stderr,
|
|
|
- show_reply=False, show_body=False, quiet=True)
|
|
|
+ I = inspect(
|
|
|
+ app=self.app,
|
|
|
+ no_color=kwargs.get('no_color', False),
|
|
|
+ stdout=self.stdout, stderr=self.stderr,
|
|
|
+ show_reply=False, show_body=False, quiet=True,
|
|
|
+ )
|
|
|
replies = I.run('ping', **kwargs)
|
|
|
if not replies:
|
|
|
raise Error('No nodes replied within time constraint',
|
|
@@ -459,18 +462,18 @@ class migrate(Command):
|
|
|
"""
|
|
|
args = '<source_url> <dest_url>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--limit', '-n', type='int',
|
|
|
- help='Number of tasks to consume (int)'),
|
|
|
- Option('--timeout', '-t', type='float', default=1.0,
|
|
|
- help='Timeout in seconds (float) waiting for tasks'),
|
|
|
- Option('--ack-messages', '-a', action='store_true',
|
|
|
- help='Ack messages from source broker.'),
|
|
|
- Option('--tasks', '-T',
|
|
|
- help='List of task names to filter on.'),
|
|
|
- Option('--queues', '-Q',
|
|
|
- help='List of queues to migrate.'),
|
|
|
- Option('--forever', '-F', action='store_true',
|
|
|
- help='Continually migrate tasks until killed.'),
|
|
|
+ Option('--limit', '-n', type='int',
|
|
|
+ help='Number of tasks to consume (int)'),
|
|
|
+ Option('--timeout', '-t', type='float', default=1.0,
|
|
|
+ help='Timeout in seconds (float) waiting for tasks'),
|
|
|
+ Option('--ack-messages', '-a', action='store_true',
|
|
|
+ help='Ack messages from source broker.'),
|
|
|
+ Option('--tasks', '-T',
|
|
|
+ help='List of task names to filter on.'),
|
|
|
+ Option('--queues', '-Q',
|
|
|
+ help='List of queues to migrate.'),
|
|
|
+ Option('--forever', '-F', action='store_true',
|
|
|
+ help='Continually migrate tasks until killed.'),
|
|
|
)
|
|
|
progress_fmt = MIGRATE_PROGRESS_FMT
|
|
|
|
|
@@ -515,20 +518,20 @@ class shell(Command): # pragma: no cover
|
|
|
<AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
|
|
|
"""
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--ipython', '-I',
|
|
|
- action='store_true', dest='force_ipython',
|
|
|
- help='force iPython.'),
|
|
|
- Option('--bpython', '-B',
|
|
|
- action='store_true', dest='force_bpython',
|
|
|
- help='force bpython.'),
|
|
|
- Option('--python', '-P',
|
|
|
- action='store_true', dest='force_python',
|
|
|
- help='force default Python shell.'),
|
|
|
- Option('--without-tasks', '-T', action='store_true',
|
|
|
- help="don't add tasks to locals."),
|
|
|
- Option('--eventlet', action='store_true',
|
|
|
- help='use eventlet.'),
|
|
|
- Option('--gevent', action='store_true', help='use gevent.'),
|
|
|
+ Option('--ipython', '-I',
|
|
|
+ action='store_true', dest='force_ipython',
|
|
|
+ help='force iPython.'),
|
|
|
+ Option('--bpython', '-B',
|
|
|
+ action='store_true', dest='force_bpython',
|
|
|
+ help='force bpython.'),
|
|
|
+ Option('--python', '-P',
|
|
|
+ action='store_true', dest='force_python',
|
|
|
+ help='force default Python shell.'),
|
|
|
+ Option('--without-tasks', '-T', action='store_true',
|
|
|
+ help="don't add tasks to locals."),
|
|
|
+ Option('--eventlet', action='store_true',
|
|
|
+ help='use eventlet.'),
|
|
|
+ Option('--gevent', action='store_true', help='use gevent.'),
|
|
|
)
|
|
|
|
|
|
def run(self, force_ipython=False, force_bpython=False,
|
|
@@ -553,9 +556,10 @@ class shell(Command): # pragma: no cover
|
|
|
'subtask': celery.subtask}
|
|
|
|
|
|
if not without_tasks:
|
|
|
- self.locals.update(dict((task.__name__, task)
|
|
|
- for task in values(self.app.tasks)
|
|
|
- if not task.name.startswith('celery.')))
|
|
|
+ self.locals.update(dict(
|
|
|
+ (task.__name__, task) for task in values(self.app.tasks)
|
|
|
+ if not task.name.startswith('celery.')),
|
|
|
+ )
|
|
|
|
|
|
if force_python:
|
|
|
return self.invoke_fallback_shell()
|
|
@@ -587,7 +591,7 @@ class shell(Command): # pragma: no cover
|
|
|
else:
|
|
|
import rlcompleter
|
|
|
readline.set_completer(
|
|
|
- rlcompleter.Completer(self.locals).complete)
|
|
|
+ rlcompleter.Completer(self.locals).complete)
|
|
|
readline.parse_and_bind('tab:complete')
|
|
|
code.interact(local=self.locals)
|
|
|
|
|
@@ -664,8 +668,9 @@ class CeleryCommand(Command):
|
|
|
cls, argv = self.commands['help'], ['help']
|
|
|
cls = self.commands.get(command) or self.commands['help']
|
|
|
try:
|
|
|
- return cls(app=self.app).run_from_argv(self.prog_name,
|
|
|
- argv[1:], command=argv[0])
|
|
|
+ return cls(app=self.app).run_from_argv(
|
|
|
+ self.prog_name, argv[1:], command=argv[0],
|
|
|
+ )
|
|
|
except Error:
|
|
|
return self.execute('help', argv)
|
|
|
|
|
@@ -723,7 +728,7 @@ class CeleryCommand(Command):
|
|
|
ret.extend([
|
|
|
text.indent('+ {0}: '.format(white(cls)), indent),
|
|
|
'\n'.join(self.get_command_info(command, indent + 4, color)
|
|
|
- for command in commands),
|
|
|
+ for command in commands),
|
|
|
''
|
|
|
])
|
|
|
return '\n'.join(ret).strip()
|