|
@@ -99,7 +99,7 @@ class Command(BaseCommand):
|
|
|
)
|
|
|
|
|
|
def __init__(self, app=None, no_color=False, stdout=sys.stdout,
|
|
|
- stderr=sys.stderr, show_reply=True):
|
|
|
+ stderr=sys.stderr, show_reply=True):
|
|
|
super(Command, self).__init__(app=app)
|
|
|
self.colored = term.colored(enabled=not no_color)
|
|
|
self.stdout = stdout
|
|
@@ -136,7 +136,7 @@ class Command(BaseCommand):
|
|
|
self.arglist = argv[1:]
|
|
|
self.parser = self.create_parser(self.prog_name, self.command)
|
|
|
options, args = self.prepare_args(
|
|
|
- *self.parser.parse_args(self.arglist))
|
|
|
+ *self.parser.parse_args(self.arglist))
|
|
|
self.colored = term.colored(enabled=not options['no_color'])
|
|
|
self.quiet = options.get('quiet', False)
|
|
|
self.show_body = options.get('show_body', True)
|
|
@@ -150,7 +150,7 @@ class Command(BaseCommand):
|
|
|
if not n:
|
|
|
return '- empty -'
|
|
|
return '\n'.join(str(c.reset(c.white('*'), ' %s' % (item, )))
|
|
|
- for item in n)
|
|
|
+ for item in n)
|
|
|
|
|
|
def prettify_dict_ok_error(self, n):
|
|
|
c = self.colored
|
|
@@ -334,7 +334,7 @@ class list_(Command):
|
|
|
raise Error('You must specify what to list (%s)' % available)
|
|
|
if what not in topics:
|
|
|
raise Error('unknown topic %r (choose one of: %s)' % (
|
|
|
- what, available))
|
|
|
+ what, available))
|
|
|
with self.app.connection() as conn:
|
|
|
self.app.amqp.TaskConsumer(conn).declare()
|
|
|
topics[what](conn.manager)
|
|
@@ -351,16 +351,16 @@ class call(Command):
|
|
|
"""
|
|
|
args = '<task_name>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--args', '-a', help='positional arguments (json).'),
|
|
|
- Option('--kwargs', '-k', help='keyword arguments (json).'),
|
|
|
- Option('--eta', help='scheduled time (ISO-8601).'),
|
|
|
- Option('--countdown', type='float',
|
|
|
- help='eta in seconds from now (float/int).'),
|
|
|
- Option('--expires', help='expiry time (ISO-8601/float/int).'),
|
|
|
- Option('--serializer', default='json', help='defaults to json.'),
|
|
|
- Option('--queue', help='custom queue name.'),
|
|
|
- Option('--exchange', help='custom exchange name.'),
|
|
|
- Option('--routing-key', help='custom routing key.'),
|
|
|
+ Option('--args', '-a', help='positional arguments (json).'),
|
|
|
+ Option('--kwargs', '-k', help='keyword arguments (json).'),
|
|
|
+ Option('--eta', help='scheduled time (ISO-8601).'),
|
|
|
+ Option('--countdown', type='float',
|
|
|
+ help='eta in seconds from now (float/int).'),
|
|
|
+ Option('--expires', help='expiry time (ISO-8601/float/int).'),
|
|
|
+ Option('--serializer', default='json', help='defaults to json.'),
|
|
|
+ Option('--queue', help='custom queue name.'),
|
|
|
+ Option('--exchange', help='custom exchange name.'),
|
|
|
+ Option('--routing-key', help='custom routing key.'),
|
|
|
)
|
|
|
|
|
|
def run(self, name, *_, **kw):
|
|
@@ -428,9 +428,9 @@ class result(Command):
|
|
|
"""
|
|
|
args = '<task_id>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--task', '-t', help='name of task (if custom backend)'),
|
|
|
- Option('--traceback', action='store_true',
|
|
|
- help='show traceback instead'),
|
|
|
+ Option('--task', '-t', help='name of task (if custom backend)'),
|
|
|
+ Option('--traceback', action='store_true',
|
|
|
+ help='show traceback instead'),
|
|
|
)
|
|
|
|
|
|
def run(self, task_id, *args, **kwargs):
|
|
@@ -454,14 +454,14 @@ class _RemoteControl(Command):
|
|
|
choices = None
|
|
|
leaf = False
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--timeout', '-t', type='float',
|
|
|
- help='Timeout in seconds (float) waiting for reply'),
|
|
|
- Option('--destination', '-d',
|
|
|
- help='Comma separated list of destination node names.'))
|
|
|
+ Option('--timeout', '-t', type='float',
|
|
|
+ help='Timeout in seconds (float) waiting for reply'),
|
|
|
+ Option('--destination', '-d',
|
|
|
+ help='Comma separated list of destination node names.'))
|
|
|
|
|
|
@classmethod
|
|
|
- def get_command_info(self, command, indent=0, prefix='', color=None,
|
|
|
- help=False):
|
|
|
+ def get_command_info(self, command,
|
|
|
+ indent=0, prefix='', color=None, help=False):
|
|
|
if help:
|
|
|
help = '|' + text.indent(self.choices[command][1], indent + 4)
|
|
|
else:
|
|
@@ -484,7 +484,7 @@ class _RemoteControl(Command):
|
|
|
color = color if color else lambda x: x
|
|
|
prefix = prefix + ' ' if prefix else ''
|
|
|
return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
|
|
|
- for c in sorted(self.choices))
|
|
|
+ for c in sorted(self.choices))
|
|
|
|
|
|
@property
|
|
|
def epilog(self):
|
|
@@ -495,7 +495,7 @@ class _RemoteControl(Command):
|
|
|
|
|
|
def usage(self, command):
|
|
|
return '%%prog %s [options] %s <command> [arg1 .. argN]' % (
|
|
|
- command, self.args)
|
|
|
+ command, self.args)
|
|
|
|
|
|
def call(self, *args, **kwargs):
|
|
|
raise NotImplementedError('get_obj')
|
|
@@ -595,10 +595,10 @@ class control(_RemoteControl):
|
|
|
'disable_events': (1.0, 'tell worker(s) to disable events'),
|
|
|
'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
|
|
|
'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
|
|
|
- 'rate_limit': (1.0,
|
|
|
- 'tell worker(s) to modify the rate limit for a task type'),
|
|
|
- 'time_limit': (1.0,
|
|
|
- 'tell worker(s) to modify the time limit for a task type.'),
|
|
|
+ 'rate_limit': (
|
|
|
+ 1.0, 'tell worker(s) to modify the rate limit for a task type'),
|
|
|
+ 'time_limit': (
|
|
|
+ 1.0, 'tell worker(s) to modify the time limit for a task type.'),
|
|
|
'autoscale': (1.0, 'change autoscale settings'),
|
|
|
'pool_grow': (1.0, 'start more pool processes'),
|
|
|
'pool_shrink': (1.0, 'use less pool processes'),
|
|
@@ -607,7 +607,7 @@ class control(_RemoteControl):
|
|
|
def call(self, method, *args, **options):
|
|
|
# XXX Python 2.5 doesn't support X(*args, reply=True, **kwargs)
|
|
|
return getattr(self.app.control, method)(
|
|
|
- *args, **dict(options, retry=True))
|
|
|
+ *args, **dict(options, retry=True))
|
|
|
|
|
|
def pool_grow(self, method, n=1, **kwargs):
|
|
|
"""[N=1]"""
|
|
@@ -630,7 +630,7 @@ class control(_RemoteControl):
|
|
|
return self.call(method, task_name, soft, hard, reply=True, **kwargs)
|
|
|
|
|
|
def add_consumer(self, method, queue, exchange=None,
|
|
|
- exchange_type='direct', routing_key=None, **kwargs):
|
|
|
+ exchange_type='direct', routing_key=None, **kwargs):
|
|
|
"""<queue> [exchange [type [routing_key]]]"""
|
|
|
return self.call(method, queue, exchange,
|
|
|
exchange_type, routing_key, reply=True, **kwargs)
|
|
@@ -646,11 +646,12 @@ class status(Command):
|
|
|
option_list = inspect.option_list
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
- replies = inspect(app=self.app,
|
|
|
- no_color=kwargs.get('no_color', False),
|
|
|
- stdout=self.stdout, stderr=self.stderr,
|
|
|
- show_reply=False) \
|
|
|
- .run('ping', **dict(kwargs, quiet=True, show_body=False))
|
|
|
+ replies = inspect(
|
|
|
+ app=self.app,
|
|
|
+ no_color=kwargs.get('no_color', False),
|
|
|
+ stdout=self.stdout, stderr=self.stderr,
|
|
|
+ show_reply=False).run(
|
|
|
+ 'ping', **dict(kwargs, quiet=True, show_body=False))
|
|
|
if not replies:
|
|
|
raise Error('No nodes replied within time constraint',
|
|
|
status=EX_UNAVAILABLE)
|
|
@@ -674,18 +675,18 @@ class migrate(Command):
|
|
|
"""
|
|
|
args = '<source_url> <dest_url>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--limit', '-n', type='int',
|
|
|
- help='Number of tasks to consume (int)'),
|
|
|
- Option('--timeout', '-t', type='float', default=1.0,
|
|
|
- help='Timeout in seconds (float) waiting for tasks'),
|
|
|
- Option('--ack-messages', '-a', action='store_true',
|
|
|
- help='Ack messages from source broker.'),
|
|
|
- Option('--tasks', '-T',
|
|
|
- help='List of task names to filter on.'),
|
|
|
- Option('--queues', '-Q',
|
|
|
- help='List of queues to migrate.'),
|
|
|
- Option('--forever', '-F', action='store_true',
|
|
|
- help='Continually migrate tasks until killed.'),
|
|
|
+ Option('--limit', '-n', type='int',
|
|
|
+ help='Number of tasks to consume (int)'),
|
|
|
+ Option('--timeout', '-t', type='float', default=1.0,
|
|
|
+ help='Timeout in seconds (float) waiting for tasks'),
|
|
|
+ Option('--ack-messages', '-a', action='store_true',
|
|
|
+ help='Ack messages from source broker.'),
|
|
|
+ Option('--tasks', '-T',
|
|
|
+ help='List of task names to filter on.'),
|
|
|
+ Option('--queues', '-Q',
|
|
|
+ help='List of queues to migrate.'),
|
|
|
+ Option('--forever', '-F', action='store_true',
|
|
|
+ help='Continually migrate tasks until killed.'),
|
|
|
)
|
|
|
|
|
|
def on_migrate_task(self, state, body, message):
|
|
@@ -729,20 +730,20 @@ class shell(Command): # pragma: no cover
|
|
|
<AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
|
|
|
"""
|
|
|
option_list = Command.option_list + (
|
|
|
- Option('--ipython', '-I',
|
|
|
- action='store_true', dest='force_ipython',
|
|
|
- help='force iPython.'),
|
|
|
- Option('--bpython', '-B',
|
|
|
- action='store_true', dest='force_bpython',
|
|
|
- help='force bpython.'),
|
|
|
- Option('--python', '-P',
|
|
|
- action='store_true', dest='force_python',
|
|
|
- help='force default Python shell.'),
|
|
|
- Option('--without-tasks', '-T', action='store_true',
|
|
|
- help="don't add tasks to locals."),
|
|
|
- Option('--eventlet', action='store_true',
|
|
|
- help='use eventlet.'),
|
|
|
- Option('--gevent', action='store_true', help='use gevent.'),
|
|
|
+ Option('--ipython', '-I',
|
|
|
+ action='store_true', dest='force_ipython',
|
|
|
+ help='force iPython.'),
|
|
|
+ Option('--bpython', '-B',
|
|
|
+ action='store_true', dest='force_bpython',
|
|
|
+ help='force bpython.'),
|
|
|
+ Option('--python', '-P',
|
|
|
+ action='store_true', dest='force_python',
|
|
|
+ help='force default Python shell.'),
|
|
|
+ Option('--without-tasks', '-T', action='store_true',
|
|
|
+ help="don't add tasks to locals."),
|
|
|
+ Option('--eventlet', action='store_true',
|
|
|
+ help='use eventlet.'),
|
|
|
+ Option('--gevent', action='store_true', help='use gevent.'),
|
|
|
)
|
|
|
|
|
|
def run(self, force_ipython=False, force_bpython=False,
|
|
@@ -767,9 +768,10 @@ class shell(Command): # pragma: no cover
|
|
|
'subtask': celery.subtask}
|
|
|
|
|
|
if not without_tasks:
|
|
|
- self.locals.update(dict((task.__name__, task)
|
|
|
- for task in self.app.tasks.itervalues()
|
|
|
- if not task.name.startswith('celery.')))
|
|
|
+ self.locals.update(dict(
|
|
|
+ (task.__name__, task) for task in self.app.tasks.itervalues()
|
|
|
+ if not task.name.startswith('celery.')),
|
|
|
+ )
|
|
|
|
|
|
if force_python:
|
|
|
return self.invoke_fallback_shell()
|
|
@@ -801,7 +803,7 @@ class shell(Command): # pragma: no cover
|
|
|
else:
|
|
|
import rlcompleter
|
|
|
readline.set_completer(
|
|
|
- rlcompleter.Completer(self.locals).complete)
|
|
|
+ rlcompleter.Completer(self.locals).complete)
|
|
|
readline.parse_and_bind('tab:complete')
|
|
|
code.interact(local=self.locals)
|
|
|
|
|
@@ -913,7 +915,7 @@ class CeleryCommand(BaseCommand):
|
|
|
ret.extend([
|
|
|
text.indent('+ %s: ' % white(cls), indent),
|
|
|
'\n'.join(self.get_command_info(command, indent + 4, color)
|
|
|
- for command in commands),
|
|
|
+ for command in commands),
|
|
|
''
|
|
|
])
|
|
|
return '\n'.join(ret).strip()
|