|
@@ -37,18 +37,9 @@ Type '%(prog_name)s <command> --help' for help using a specific command.
|
|
|
commands = {}
|
|
|
|
|
|
command_classes = (
|
|
|
- ("Main",
|
|
|
- ["worker", "events", "beat", "shell", "amqp", "help"],
|
|
|
- "green",
|
|
|
- ),
|
|
|
- ("Remote Control",
|
|
|
- ["status", "inspect", "control"],
|
|
|
- "blue",
|
|
|
- ),
|
|
|
- ("Utils",
|
|
|
- ["purge", "list", "migrate", "apply", "result", "report"],
|
|
|
- None,
|
|
|
- ),
|
|
|
+ ('Main', ['worker', 'events', 'beat', 'shell', 'amqp', 'help'], 'green'),
|
|
|
+ ('Remote Control', ['status', 'inspect', 'control'], 'blue'),
|
|
|
+ ('Utils', ['purge', 'list', 'migrate', 'apply', 'result', 'report'], None),
|
|
|
)
|
|
|
|
|
|
|
|
@@ -70,16 +61,16 @@ def command(fun, name=None, sortpri=0):
|
|
|
|
|
|
|
|
|
class Command(BaseCommand):
|
|
|
- help = ""
|
|
|
- args = ""
|
|
|
+ help = ''
|
|
|
+ args = ''
|
|
|
version = __version__
|
|
|
- prog_name = "celery"
|
|
|
+ prog_name = 'celery'
|
|
|
show_body = True
|
|
|
leaf = True
|
|
|
|
|
|
option_list = (
|
|
|
- Option("--quiet", "-q", action="store_true"),
|
|
|
- Option("--no-color", "-C", action="store_true"),
|
|
|
+ Option('--quiet', '-q', action='store_true'),
|
|
|
+ Option('--no-color', '-C', action='store_true'),
|
|
|
)
|
|
|
|
|
|
def __init__(self, app=None, no_color=False, stdout=sys.stdout,
|
|
@@ -94,13 +85,13 @@ class Command(BaseCommand):
|
|
|
try:
|
|
|
ret = self.run(*args, **kwargs)
|
|
|
except Error, exc:
|
|
|
- self.error(self.colored.red("Error: %s" % exc))
|
|
|
+ self.error(self.colored.red('Error: %s' % exc))
|
|
|
return exc.status
|
|
|
|
|
|
return ret if ret is not None else EX_OK
|
|
|
|
|
|
def show_help(self, command):
|
|
|
- self.run_from_argv(self.prog_name, [command, "--help"])
|
|
|
+ self.run_from_argv(self.prog_name, [command, '--help'])
|
|
|
return EX_USAGE
|
|
|
|
|
|
def error(self, s):
|
|
@@ -108,8 +99,8 @@ class Command(BaseCommand):
|
|
|
|
|
|
def out(self, s, fh=None):
|
|
|
s = str(s)
|
|
|
- if not s.endswith("\n"):
|
|
|
- s += "\n"
|
|
|
+ if not s.endswith('\n'):
|
|
|
+ s += '\n'
|
|
|
(fh or self.stdout).write(s)
|
|
|
|
|
|
def run_from_argv(self, prog_name, argv):
|
|
@@ -119,55 +110,55 @@ class Command(BaseCommand):
|
|
|
self.parser = self.create_parser(self.prog_name, self.command)
|
|
|
options, args = self.prepare_args(
|
|
|
*self.parser.parse_args(self.arglist))
|
|
|
- self.colored = term.colored(enabled=not options["no_color"])
|
|
|
- self.quiet = options.get("quiet", False)
|
|
|
- self.show_body = options.get("show_body", True)
|
|
|
+ self.colored = term.colored(enabled=not options['no_color'])
|
|
|
+ self.quiet = options.get('quiet', False)
|
|
|
+ self.show_body = options.get('show_body', True)
|
|
|
return self(*args, **options)
|
|
|
|
|
|
def usage(self, command):
|
|
|
- return "%%prog %s [options] %s" % (command, self.args)
|
|
|
+ return '%%prog %s [options] %s' % (command, self.args)
|
|
|
|
|
|
def prettify_list(self, n):
|
|
|
c = self.colored
|
|
|
if not n:
|
|
|
- return "- empty -"
|
|
|
- return "\n".join(str(c.reset(c.white("*"), " %s" % (item, )))
|
|
|
+ return '- empty -'
|
|
|
+ return '\n'.join(str(c.reset(c.white('*'), ' %s' % (item, )))
|
|
|
for item in n)
|
|
|
|
|
|
def prettify_dict_ok_error(self, n):
|
|
|
c = self.colored
|
|
|
try:
|
|
|
- return (c.green("OK"),
|
|
|
- text.indent(self.prettify(n["ok"])[1], 4))
|
|
|
+ return (c.green('OK'),
|
|
|
+ text.indent(self.prettify(n['ok'])[1], 4))
|
|
|
except KeyError:
|
|
|
pass
|
|
|
- return (c.red("ERROR"),
|
|
|
- text.indent(self.prettify(n["error"])[1], 4))
|
|
|
+ return (c.red('ERROR'),
|
|
|
+ text.indent(self.prettify(n['error'])[1], 4))
|
|
|
|
|
|
def say_remote_command_reply(self, replies):
|
|
|
c = self.colored
|
|
|
node = replies.keys()[0]
|
|
|
reply = replies[node]
|
|
|
status, preply = self.prettify(reply)
|
|
|
- self.say_chat("->", c.cyan(node, ": ") + status,
|
|
|
+ self.say_chat('->', c.cyan(node, ': ') + status,
|
|
|
text.indent(preply, 4))
|
|
|
|
|
|
def prettify(self, n):
|
|
|
- OK = str(self.colored.green("OK"))
|
|
|
+ OK = str(self.colored.green('OK'))
|
|
|
if isinstance(n, list):
|
|
|
return OK, self.prettify_list(n)
|
|
|
if isinstance(n, dict):
|
|
|
- if "ok" in n or "error" in n:
|
|
|
+ if 'ok' in n or 'error' in n:
|
|
|
return self.prettify_dict_ok_error(n)
|
|
|
if isinstance(n, basestring):
|
|
|
return OK, unicode(n)
|
|
|
return OK, pformat(n)
|
|
|
|
|
|
- def say_chat(self, direction, title, body=""):
|
|
|
+ def say_chat(self, direction, title, body=''):
|
|
|
c = self.colored
|
|
|
- if direction == "<-" and self.quiet:
|
|
|
+ if direction == '<-' and self.quiet:
|
|
|
return
|
|
|
- dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""
|
|
|
+ dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
|
|
|
self.out(c.reset(dirstr, title))
|
|
|
if body and self.show_body:
|
|
|
self.out(body)
|
|
@@ -197,11 +188,6 @@ class Delegate(Command):
|
|
|
return self.target.run(*args, **kwargs)
|
|
|
|
|
|
|
|
|
-def create_delegate(name, Command):
|
|
|
- return command(type(name, (Delegate, ), {"Command": Command,
|
|
|
- "__module__": __name__}))
|
|
|
-
|
|
|
-
|
|
|
class worker(Delegate):
|
|
|
"""Start worker instance.
|
|
|
|
|
@@ -215,7 +201,7 @@ class worker(Delegate):
|
|
|
|
|
|
celery worker --autoscale=10,0
|
|
|
"""
|
|
|
- Command = "celery.bin.celeryd:WorkerCommand"
|
|
|
+ Command = 'celery.bin.celeryd:WorkerCommand'
|
|
|
worker = command(worker, sortpri=01)
|
|
|
|
|
|
|
|
@@ -238,7 +224,7 @@ class events(Delegate):
|
|
|
celery events -d
|
|
|
celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
|
|
|
"""
|
|
|
- Command = "celery.bin.celeryev:EvCommand"
|
|
|
+ Command = 'celery.bin.celeryev:EvCommand'
|
|
|
events = command(events, sortpri=10)
|
|
|
|
|
|
|
|
@@ -252,7 +238,7 @@ class beat(Delegate):
|
|
|
celery beat -S djcelery.schedulers.DatabaseScheduler
|
|
|
|
|
|
"""
|
|
|
- Command = "celery.bin.celerybeat:BeatCommand"
|
|
|
+ Command = 'celery.bin.celerybeat:BeatCommand'
|
|
|
beat = command(beat, sortpri=20)
|
|
|
|
|
|
|
|
@@ -273,7 +259,7 @@ class amqp(Delegate):
|
|
|
celery amqp queue.delete queue yes yes
|
|
|
|
|
|
"""
|
|
|
- Command = "celery.bin.camqadm:AMQPAdminCommand"
|
|
|
+ Command = 'celery.bin.camqadm:AMQPAdminCommand'
|
|
|
amqp = command(amqp, sortpri=30)
|
|
|
|
|
|
|
|
@@ -286,33 +272,33 @@ class list_(Command):
|
|
|
|
|
|
NOTE: For RabbitMQ the management plugin is required.
|
|
|
"""
|
|
|
- args = "[bindings]"
|
|
|
+ args = '[bindings]'
|
|
|
|
|
|
def list_bindings(self, management):
|
|
|
try:
|
|
|
bindings = management.get_bindings()
|
|
|
except NotImplementedError:
|
|
|
- raise Error("Your transport cannot list bindings.")
|
|
|
+ raise Error('Your transport cannot list bindings.')
|
|
|
|
|
|
- fmt = lambda q, e, r: self.out("%s %s %s" % (q.ljust(28),
|
|
|
+ fmt = lambda q, e, r: self.out('%s %s %s' % (q.ljust(28),
|
|
|
e.ljust(28), r))
|
|
|
- fmt("Queue", "Exchange", "Routing Key")
|
|
|
- fmt("-" * 16, "-" * 16, "-" * 16)
|
|
|
+ fmt('Queue', 'Exchange', 'Routing Key')
|
|
|
+ fmt('-' * 16, '-' * 16, '-' * 16)
|
|
|
for b in bindings:
|
|
|
- fmt(b["destination"], b["source"], b["routing_key"])
|
|
|
+ fmt(b['destination'], b['source'], b['routing_key'])
|
|
|
|
|
|
def run(self, what=None, *_, **kw):
|
|
|
- topics = {"bindings": self.list_bindings}
|
|
|
+ topics = {'bindings': self.list_bindings}
|
|
|
available = ', '.join(topics.keys())
|
|
|
if not what:
|
|
|
- raise Error("You must specify what to list (%s)" % available)
|
|
|
+ raise Error('You must specify what to list (%s)' % available)
|
|
|
if what not in topics:
|
|
|
- raise Error("unknown topic %r (choose one of: %s)" % (
|
|
|
+ raise Error('unknown topic %r (choose one of: %s)' % (
|
|
|
what, available))
|
|
|
with self.app.broker_connection() as conn:
|
|
|
self.app.amqp.TaskConsumer(conn).declare()
|
|
|
topics[what](conn.manager)
|
|
|
-list_ = command(list_, "list")
|
|
|
+list_ = command(list_, 'list')
|
|
|
|
|
|
|
|
|
class apply(Command):
|
|
@@ -323,33 +309,33 @@ class apply(Command):
|
|
|
celery apply tasks.add --args='[2, 2]'
|
|
|
celery apply tasks.add --args='[2, 2]' --countdown=10
|
|
|
"""
|
|
|
- args = "<task_name>"
|
|
|
+ args = '<task_name>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option("--args", "-a", help="positional arguments (json)."),
|
|
|
- Option("--kwargs", "-k", help="keyword arguments (json)."),
|
|
|
- Option("--eta", help="scheduled time (ISO-8601)."),
|
|
|
- Option("--countdown", type="float",
|
|
|
- help="eta in seconds from now (float/int)."),
|
|
|
- Option("--expires", help="expiry time (ISO-8601/float/int)."),
|
|
|
- Option("--serializer", default="json", help="defaults to json."),
|
|
|
- Option("--queue", help="custom queue name."),
|
|
|
- Option("--exchange", help="custom exchange name."),
|
|
|
- Option("--routing-key", help="custom routing key."),
|
|
|
+ Option('--args', '-a', help='positional arguments (json).'),
|
|
|
+ Option('--kwargs', '-k', help='keyword arguments (json).'),
|
|
|
+ Option('--eta', help='scheduled time (ISO-8601).'),
|
|
|
+ Option('--countdown', type='float',
|
|
|
+ help='eta in seconds from now (float/int).'),
|
|
|
+ Option('--expires', help='expiry time (ISO-8601/float/int).'),
|
|
|
+ Option('--serializer', default='json', help='defaults to json.'),
|
|
|
+ Option('--queue', help='custom queue name.'),
|
|
|
+ Option('--exchange', help='custom exchange name.'),
|
|
|
+ Option('--routing-key', help='custom routing key.'),
|
|
|
)
|
|
|
|
|
|
def run(self, name, *_, **kw):
|
|
|
# Positional args.
|
|
|
- args = kw.get("args") or ()
|
|
|
+ args = kw.get('args') or ()
|
|
|
if isinstance(args, basestring):
|
|
|
args = anyjson.loads(args)
|
|
|
|
|
|
# Keyword args.
|
|
|
- kwargs = kw.get("kwargs") or {}
|
|
|
+ kwargs = kw.get('kwargs') or {}
|
|
|
if isinstance(kwargs, basestring):
|
|
|
kwargs = anyjson.loads(kwargs)
|
|
|
|
|
|
# Expires can be int/float.
|
|
|
- expires = kw.get("expires") or None
|
|
|
+ expires = kw.get('expires') or None
|
|
|
try:
|
|
|
expires = float(expires)
|
|
|
except (TypeError, ValueError):
|
|
@@ -360,12 +346,12 @@ class apply(Command):
|
|
|
raise
|
|
|
|
|
|
res = self.app.send_task(name, args=args, kwargs=kwargs,
|
|
|
- countdown=kw.get("countdown"),
|
|
|
- serializer=kw.get("serializer"),
|
|
|
- queue=kw.get("queue"),
|
|
|
- exchange=kw.get("exchange"),
|
|
|
- routing_key=kw.get("routing_key"),
|
|
|
- eta=maybe_iso8601(kw.get("eta")),
|
|
|
+ countdown=kw.get('countdown'),
|
|
|
+ serializer=kw.get('serializer'),
|
|
|
+ queue=kw.get('queue'),
|
|
|
+ exchange=kw.get('exchange'),
|
|
|
+ routing_key=kw.get('routing_key'),
|
|
|
+ eta=maybe_iso8601(kw.get('eta')),
|
|
|
expires=expires)
|
|
|
self.out(res.id)
|
|
|
apply = command(apply)
|
|
@@ -381,12 +367,12 @@ class purge(Command):
|
|
|
queues = len(self.app.amqp.queues.keys())
|
|
|
messages_removed = self.app.control.purge()
|
|
|
if messages_removed:
|
|
|
- self.out("Purged %s %s from %s known task %s." % (
|
|
|
- messages_removed, text.pluralize(messages_removed, "message"),
|
|
|
- queues, text.pluralize(queues, "queue")))
|
|
|
+ self.out('Purged %s %s from %s known task %s.' % (
|
|
|
+ messages_removed, text.pluralize(messages_removed, 'message'),
|
|
|
+ queues, text.pluralize(queues, 'queue')))
|
|
|
else:
|
|
|
- self.out("No messages purged from %s known %s" % (
|
|
|
- queues, text.pluralize(queues, "queue")))
|
|
|
+ self.out('No messages purged from %s known %s' % (
|
|
|
+ queues, text.pluralize(queues, 'queue')))
|
|
|
purge = command(purge)
|
|
|
|
|
|
|
|
@@ -400,17 +386,17 @@ class result(Command):
|
|
|
celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
|
|
|
|
|
|
"""
|
|
|
- args = "<task_id>"
|
|
|
+ args = '<task_id>'
|
|
|
option_list = Command.option_list + (
|
|
|
- Option("--task", "-t", help="name of task (if custom backend)"),
|
|
|
- Option("--traceback", action="store_true",
|
|
|
- help="show traceback instead"),
|
|
|
+ Option('--task', '-t', help='name of task (if custom backend)'),
|
|
|
+ Option('--traceback', action='store_true',
|
|
|
+ help='show traceback instead'),
|
|
|
)
|
|
|
|
|
|
def run(self, task_id, *args, **kwargs):
|
|
|
result_cls = self.app.AsyncResult
|
|
|
- task = kwargs.get("task")
|
|
|
- traceback = kwargs.get("traceback", False)
|
|
|
+ task = kwargs.get('task')
|
|
|
+ traceback = kwargs.get('traceback', False)
|
|
|
|
|
|
if task:
|
|
|
result_cls = self.app.tasks[task].AsyncResult
|
|
@@ -428,13 +414,13 @@ class _RemoteControl(Command):
|
|
|
choices = None
|
|
|
leaf = False
|
|
|
option_list = Command.option_list + (
|
|
|
- Option("--timeout", "-t", type="float",
|
|
|
- help="Timeout in seconds (float) waiting for reply"),
|
|
|
- Option("--destination", "-d",
|
|
|
- help="Comma separated list of destination node names."))
|
|
|
+ Option('--timeout', '-t', type='float',
|
|
|
+ help='Timeout in seconds (float) waiting for reply'),
|
|
|
+ Option('--destination', '-d',
|
|
|
+ help='Comma separated list of destination node names.'))
|
|
|
|
|
|
@classmethod
|
|
|
- def get_command_info(self, command, indent=0, prefix="", color=None,
|
|
|
+ def get_command_info(self, command, indent=0, prefix='', color=None,
|
|
|
help=False):
|
|
|
if help:
|
|
|
help = '|' + text.indent(self.choices[command][1], indent + 4)
|
|
@@ -444,52 +430,52 @@ class _RemoteControl(Command):
|
|
|
# see if it uses args.
|
|
|
meth = getattr(self, command)
|
|
|
return text.join([
|
|
|
- '|' + text.indent("%s%s %s" % (prefix, color(command),
|
|
|
+ '|' + text.indent('%s%s %s' % (prefix, color(command),
|
|
|
meth.__doc__), indent), help,
|
|
|
])
|
|
|
|
|
|
except AttributeError:
|
|
|
return text.join([
|
|
|
- "|" + text.indent(prefix + str(color(command)), indent), help,
|
|
|
+ '|' + text.indent(prefix + str(color(command)), indent), help,
|
|
|
])
|
|
|
|
|
|
@classmethod
|
|
|
- def list_commands(self, indent=0, prefix="", color=None, help=False):
|
|
|
+ def list_commands(self, indent=0, prefix='', color=None, help=False):
|
|
|
color = color if color else lambda x: x
|
|
|
- prefix = prefix + " " if prefix else ""
|
|
|
- return "\n".join(self.get_command_info(c, indent, prefix, color, help)
|
|
|
+ prefix = prefix + ' ' if prefix else ''
|
|
|
+ return '\n'.join(self.get_command_info(c, indent, prefix, color, help)
|
|
|
for c in sorted(self.choices))
|
|
|
|
|
|
@property
|
|
|
def epilog(self):
|
|
|
- return "\n".join([
|
|
|
- "[Commands]",
|
|
|
+ return '\n'.join([
|
|
|
+ '[Commands]',
|
|
|
self.list_commands(indent=4, help=True)
|
|
|
])
|
|
|
|
|
|
def usage(self, command):
|
|
|
- return "%%prog %s [options] %s <command> [arg1 .. argN]" % (
|
|
|
+ return '%%prog %s [options] %s <command> [arg1 .. argN]' % (
|
|
|
command, self.args)
|
|
|
|
|
|
def call(self, *args, **kwargs):
|
|
|
- raise NotImplementedError("get_obj")
|
|
|
+ raise NotImplementedError('get_obj')
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
if not args:
|
|
|
- raise Error("Missing %s method. See --help" % self.name)
|
|
|
+ raise Error('Missing %s method. See --help' % self.name)
|
|
|
return self.do_call_method(args, **kwargs)
|
|
|
|
|
|
def do_call_method(self, args, **kwargs):
|
|
|
method = args[0]
|
|
|
- if method == "help":
|
|
|
- raise Error("Did you mean '%s --help'?" % self.name)
|
|
|
+ if method == 'help':
|
|
|
+ raise Error("Did you mean '%s --help'?' % self.name)
|
|
|
if method not in self.choices:
|
|
|
- raise Error("Unknown %s method %s" % (self.name, method))
|
|
|
+ raise Error('Unknown %s method %s' % (self.name, method))
|
|
|
|
|
|
- destination = kwargs.get("destination")
|
|
|
- timeout = kwargs.get("timeout") or self.choices[method][0]
|
|
|
+ destination = kwargs.get('destination')
|
|
|
+ timeout = kwargs.get('timeout') or self.choices[method][0]
|
|
|
if destination and isinstance(destination, basestring):
|
|
|
- destination = map(str.strip, destination.split(","))
|
|
|
+ destination = map(str.strip, destination.split(','))
|
|
|
|
|
|
try:
|
|
|
handler = getattr(self, method)
|
|
@@ -500,15 +486,15 @@ class _RemoteControl(Command):
|
|
|
timeout=timeout, destination=destination,
|
|
|
callback=self.say_remote_command_reply)
|
|
|
if not replies:
|
|
|
- raise Error("No nodes replied within time constraint.",
|
|
|
+ raise Error('No nodes replied within time constraint.',
|
|
|
status=EX_UNAVAILABLE)
|
|
|
return replies
|
|
|
|
|
|
- def say(self, direction, title, body=""):
|
|
|
+ def say(self, direction, title, body=''):
|
|
|
c = self.colored
|
|
|
- if direction == "<-" and self.quiet:
|
|
|
+ if direction == '<-' and self.quiet:
|
|
|
return
|
|
|
- dirstr = not self.quiet and c.bold(c.white(direction), " ") or ""
|
|
|
+ dirstr = not self.quiet and c.bold(c.white(direction), ' ') or ''
|
|
|
self.out(c.reset(dirstr, title))
|
|
|
if body and self.show_body:
|
|
|
self.out(body)
|
|
@@ -526,17 +512,17 @@ class inspect(_RemoteControl):
|
|
|
celery inspect revoked -d w1.e.com,w2.e.com
|
|
|
|
|
|
"""
|
|
|
- name = "inspect"
|
|
|
+ name = 'inspect'
|
|
|
choices = {
|
|
|
- "active": (1.0, "dump active tasks (being processed)"),
|
|
|
- "active_queues": (1.0, "dump queues being consumed from"),
|
|
|
- "scheduled": (1.0, "dump scheduled tasks (eta/countdown/retry)"),
|
|
|
- "reserved": (1.0, "dump reserved tasks (waiting to be processed)"),
|
|
|
- "stats": (1.0, "dump worker statistics"),
|
|
|
- "revoked": (1.0, "dump of revoked task ids"),
|
|
|
- "registered": (1.0, "dump of registered tasks"),
|
|
|
- "ping": (0.2, "ping worker(s)"),
|
|
|
- "report": (1.0, "get bugreport info")
|
|
|
+ 'active': (1.0, 'dump active tasks (being processed)'),
|
|
|
+ 'active_queues': (1.0, 'dump queues being consumed from'),
|
|
|
+ 'scheduled': (1.0, 'dump scheduled tasks (eta/countdown/retry)'),
|
|
|
+ 'reserved': (1.0, 'dump reserved tasks (waiting to be processed)'),
|
|
|
+ 'stats': (1.0, 'dump worker statistics'),
|
|
|
+ 'revoked': (1.0, 'dump of revoked task ids'),
|
|
|
+ 'registered': (1.0, 'dump of registered tasks'),
|
|
|
+ 'ping': (0.2, 'ping worker(s)'),
|
|
|
+ 'report': (1.0, 'get bugreport info')
|
|
|
}
|
|
|
|
|
|
def call(self, method, *args, **options):
|
|
@@ -562,19 +548,19 @@ class control(_RemoteControl):
|
|
|
celery control -d w1.e.com add_consumer queue exchange direct rkey
|
|
|
|
|
|
"""
|
|
|
- name = "control"
|
|
|
+ name = 'control'
|
|
|
choices = {
|
|
|
- "enable_events": (1.0, "tell worker(s) to enable events"),
|
|
|
- "disable_events": (1.0, "tell worker(s) to disable events"),
|
|
|
- "add_consumer": (1.0, "tell worker(s) to start consuming a queue"),
|
|
|
- "cancel_consumer": (1.0, "tell worker(s) to stop consuming a queue"),
|
|
|
- "rate_limit": (1.0,
|
|
|
- "tell worker(s) to modify the rate limit for a task type"),
|
|
|
- "time_limit": (1.0,
|
|
|
- "tell worker(s) to modify the time limit for a task type."),
|
|
|
- "autoscale": (1.0, "change autoscale settings"),
|
|
|
- "pool_grow": (1.0, "start more pool processes"),
|
|
|
- "pool_shrink": (1.0, "use less pool processes"),
|
|
|
+ 'enable_events': (1.0, 'tell worker(s) to enable events'),
|
|
|
+ 'disable_events': (1.0, 'tell worker(s) to disable events'),
|
|
|
+ 'add_consumer': (1.0, 'tell worker(s) to start consuming a queue'),
|
|
|
+ 'cancel_consumer': (1.0, 'tell worker(s) to stop consuming a queue'),
|
|
|
+ 'rate_limit': (1.0,
|
|
|
+ 'tell worker(s) to modify the rate limit for a task type'),
|
|
|
+ 'time_limit': (1.0,
|
|
|
+ 'tell worker(s) to modify the time limit for a task type.'),
|
|
|
+ 'autoscale': (1.0, 'change autoscale settings'),
|
|
|
+ 'pool_grow': (1.0, 'start more pool processes'),
|
|
|
+ 'pool_shrink': (1.0, 'use less pool processes'),
|
|
|
}
|
|
|
|
|
|
def call(self, method, *args, **options):
|
|
@@ -601,7 +587,7 @@ class control(_RemoteControl):
|
|
|
return self.call(method, task_name, soft, hard, **kwargs)
|
|
|
|
|
|
def add_consumer(self, method, queue, exchange=None,
|
|
|
- exchange_type="direct", routing_key=None, **kwargs):
|
|
|
+ exchange_type='direct', routing_key=None, **kwargs):
|
|
|
"""<queue> [exchange [type [routing_key]]]"""
|
|
|
return self.call(method, queue, exchange,
|
|
|
exchange_type, routing_key, **kwargs)
|
|
@@ -618,16 +604,16 @@ class status(Command):
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
replies = inspect(app=self.app,
|
|
|
- no_color=kwargs.get("no_color", False),
|
|
|
+ no_color=kwargs.get('no_color', False),
|
|
|
stdout=self.stdout, stderr=self.stderr) \
|
|
|
- .run("ping", **dict(kwargs, quiet=True, show_body=False))
|
|
|
+ .run('ping', **dict(kwargs, quiet=True, show_body=False))
|
|
|
if not replies:
|
|
|
- raise Error("No nodes replied within time constraint",
|
|
|
+ raise Error('No nodes replied within time constraint',
|
|
|
status=EX_UNAVAILABLE)
|
|
|
nodecount = len(replies)
|
|
|
- if not kwargs.get("quiet", False):
|
|
|
- self.out("\n%s %s online." % (nodecount,
|
|
|
- text.pluralize(nodecount, "node")))
|
|
|
+ if not kwargs.get('quiet', False):
|
|
|
+ self.out('\n%s %s online.' % (nodecount,
|
|
|
+ text.pluralize(nodecount, 'node')))
|
|
|
status = command(status)
|
|
|
|
|
|
|
|
@@ -643,15 +629,15 @@ class migrate(Command):
|
|
|
a backup of the tasks before you continue.
|
|
|
"""
|
|
|
def usage(self, command):
|
|
|
- return "%%prog %s <source_url> <dest_url>" % (command, )
|
|
|
+ return '%%prog %s <source_url> <dest_url>' % (command, )
|
|
|
|
|
|
def on_migrate_task(self, state, body, message):
|
|
|
- self.out("Migrating task %s/%s: %s[%s]" % (
|
|
|
- state.count, state.strtotal, body["task"], body["id"]))
|
|
|
+ self.out('Migrating task %s/%s: %s[%s]' % (
|
|
|
+ state.count, state.strtotal, body['task'], body['id']))
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
if len(args) != 2:
|
|
|
- return self.show_help("migrate")
|
|
|
+ return self.show_help('migrate')
|
|
|
from kombu import BrokerConnection
|
|
|
from celery.contrib.migrate import migrate_tasks
|
|
|
|
|
@@ -683,46 +669,46 @@ class shell(Command): # pragma: no cover
|
|
|
<AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
|
|
|
"""
|
|
|
option_list = Command.option_list + (
|
|
|
- Option("--ipython", "-I",
|
|
|
- action="store_true", dest="force_ipython",
|
|
|
- help="force iPython."),
|
|
|
- Option("--bpython", "-B",
|
|
|
- action="store_true", dest="force_bpython",
|
|
|
- help="force bpython."),
|
|
|
- Option("--python", "-P",
|
|
|
- action="store_true", dest="force_python",
|
|
|
- help="force default Python shell."),
|
|
|
- Option("--without-tasks", "-T", action="store_true",
|
|
|
+ Option('--ipython', '-I',
|
|
|
+ action='store_true', dest='force_ipython',
|
|
|
+ help='force iPython.'),
|
|
|
+ Option('--bpython', '-B',
|
|
|
+ action='store_true', dest='force_bpython',
|
|
|
+ help='force bpython.'),
|
|
|
+ Option('--python', '-P',
|
|
|
+ action='store_true', dest='force_python',
|
|
|
+ help='force default Python shell.'),
|
|
|
+ Option('--without-tasks', '-T', action='store_true',
|
|
|
help="don't add tasks to locals."),
|
|
|
- Option("--eventlet", action="store_true",
|
|
|
- help="use eventlet."),
|
|
|
- Option("--gevent", action="store_true", help="use gevent."),
|
|
|
+ Option('--eventlet', action='store_true',
|
|
|
+ help='use eventlet.'),
|
|
|
+ Option('--gevent', action='store_true', help='use gevent.'),
|
|
|
)
|
|
|
|
|
|
def run(self, force_ipython=False, force_bpython=False,
|
|
|
force_python=False, without_tasks=False, eventlet=False,
|
|
|
gevent=False, **kwargs):
|
|
|
if eventlet:
|
|
|
- import_module("celery.concurrency.eventlet")
|
|
|
+ import_module('celery.concurrency.eventlet')
|
|
|
if gevent:
|
|
|
- import_module("celery.concurrency.gevent")
|
|
|
+ import_module('celery.concurrency.gevent')
|
|
|
import celery
|
|
|
import celery.task.base
|
|
|
self.app.loader.import_default_modules()
|
|
|
- self.locals = {"celery": self.app,
|
|
|
- "Task": celery.Task,
|
|
|
- "chord": celery.chord,
|
|
|
- "group": celery.group,
|
|
|
- "chain": celery.chain,
|
|
|
- "chunks": celery.chunks,
|
|
|
- "xmap": celery.xmap,
|
|
|
- "xstarmap": celery.xstarmap,
|
|
|
- "subtask": celery.subtask}
|
|
|
+ self.locals = {'celery': self.app,
|
|
|
+ 'Task': celery.Task,
|
|
|
+ 'chord': celery.chord,
|
|
|
+ 'group': celery.group,
|
|
|
+ 'chain': celery.chain,
|
|
|
+ 'chunks': celery.chunks,
|
|
|
+ 'xmap': celery.xmap,
|
|
|
+ 'xstarmap': celery.xstarmap,
|
|
|
+ 'subtask': celery.subtask}
|
|
|
|
|
|
if not without_tasks:
|
|
|
self.locals.update(dict((task.__name__, task)
|
|
|
for task in self.app.tasks.itervalues()
|
|
|
- if not task.name.startswith("celery.")))
|
|
|
+ if not task.name.startswith('celery.')))
|
|
|
|
|
|
if force_python:
|
|
|
return self.invoke_fallback_shell()
|
|
@@ -755,7 +741,7 @@ class shell(Command): # pragma: no cover
|
|
|
import rlcompleter
|
|
|
readline.set_completer(
|
|
|
rlcompleter.Completer(self.locals).complete)
|
|
|
- readline.parse_and_bind("tab:complete")
|
|
|
+ readline.parse_and_bind('tab:complete')
|
|
|
code.interact(local=self.locals)
|
|
|
|
|
|
def invoke_ipython_shell(self):
|
|
@@ -777,12 +763,12 @@ class help(Command):
|
|
|
"""Show help screen and exit."""
|
|
|
|
|
|
def usage(self, command):
|
|
|
- return "%%prog <command> [options] %s" % (self.args, )
|
|
|
+ return '%%prog <command> [options] %s' % (self.args, )
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
self.parser.print_help()
|
|
|
- self.out(HELP % {"prog_name": self.prog_name,
|
|
|
- "commands": CeleryCommand.list_commands()})
|
|
|
+ self.out(HELP % {'prog_name': self.prog_name,
|
|
|
+ 'commands': CeleryCommand.list_commands()})
|
|
|
|
|
|
return EX_USAGE
|
|
|
help = command(help)
|
|
@@ -800,26 +786,26 @@ report = command(report)
|
|
|
class CeleryCommand(BaseCommand):
|
|
|
commands = commands
|
|
|
enable_config_from_cmdline = True
|
|
|
- prog_name = "celery"
|
|
|
+ prog_name = 'celery'
|
|
|
|
|
|
def execute(self, command, argv=None):
|
|
|
try:
|
|
|
cls = self.commands[command]
|
|
|
except KeyError:
|
|
|
- cls, argv = self.commands["help"], ["help"]
|
|
|
- cls = self.commands.get(command) or self.commands["help"]
|
|
|
+ cls, argv = self.commands['help'], ['help']
|
|
|
+ cls = self.commands.get(command) or self.commands['help']
|
|
|
try:
|
|
|
return cls(app=self.app).run_from_argv(self.prog_name, argv)
|
|
|
except Error:
|
|
|
- return self.execute("help", argv)
|
|
|
+ return self.execute('help', argv)
|
|
|
|
|
|
def remove_options_at_beginning(self, argv, index=0):
|
|
|
if argv:
|
|
|
while index < len(argv):
|
|
|
value = argv[index]
|
|
|
- if value.startswith("--"):
|
|
|
+ if value.startswith('--'):
|
|
|
pass
|
|
|
- elif value.startswith("-"):
|
|
|
+ elif value.startswith('-'):
|
|
|
index += 1
|
|
|
else:
|
|
|
return argv[index:]
|
|
@@ -833,7 +819,7 @@ class CeleryCommand(BaseCommand):
|
|
|
try:
|
|
|
command = argv[0]
|
|
|
except IndexError:
|
|
|
- command, argv = "help", ["help"]
|
|
|
+ command, argv = 'help', ['help']
|
|
|
return self.execute(command, argv)
|
|
|
|
|
|
def execute_from_commandline(self, argv=None):
|
|
@@ -848,11 +834,11 @@ class CeleryCommand(BaseCommand):
|
|
|
colored = term.colored().names[color] if color else lambda x: x
|
|
|
obj = self.commands[command]
|
|
|
if obj.leaf:
|
|
|
- return '|' + text.indent("celery %s" % colored(command), indent)
|
|
|
+ return '|' + text.indent('celery %s' % colored(command), indent)
|
|
|
return text.join([
|
|
|
- " ",
|
|
|
- '|' + text.indent("celery %s --help" % colored(command), indent),
|
|
|
- obj.list_commands(indent, "celery %s" % command, colored),
|
|
|
+ ' ',
|
|
|
+ '|' + text.indent('celery %s --help' % colored(command), indent),
|
|
|
+ obj.list_commands(indent, 'celery %s' % command, colored),
|
|
|
])
|
|
|
|
|
|
@classmethod
|
|
@@ -861,12 +847,12 @@ class CeleryCommand(BaseCommand):
|
|
|
ret = []
|
|
|
for cls, commands, color in command_classes:
|
|
|
ret.extend([
|
|
|
- text.indent("+ %s: " % white(cls), indent),
|
|
|
- "\n".join(self.get_command_info(command, indent + 4, color)
|
|
|
+ text.indent('+ %s: ' % white(cls), indent),
|
|
|
+ '\n'.join(self.get_command_info(command, indent + 4, color)
|
|
|
for command in commands),
|
|
|
- ""
|
|
|
+ ''
|
|
|
])
|
|
|
- return "\n".join(ret).strip()
|
|
|
+ return '\n'.join(ret).strip()
|
|
|
|
|
|
|
|
|
def determine_exit_status(ret):
|
|
@@ -879,10 +865,10 @@ def main():
|
|
|
# Fix for setuptools generated scripts, so that it will
|
|
|
# work with multiprocessing fork emulation.
|
|
|
# (see multiprocessing.forking.get_preparation_data())
|
|
|
- if __name__ != "__main__": # pragma: no cover
|
|
|
- sys.modules["__main__"] = sys.modules[__name__]
|
|
|
+ if __name__ != '__main__': # pragma: no cover
|
|
|
+ sys.modules['__main__'] = sys.modules[__name__]
|
|
|
freeze_support()
|
|
|
CeleryCommand().execute_from_commandline()
|
|
|
|
|
|
-if __name__ == "__main__": # pragma: no cover
|
|
|
+if __name__ == '__main__': # pragma: no cover
|
|
|
main()
|