|
@@ -66,10 +66,13 @@ class Error(Exception):
|
|
|
return self.reason
|
|
|
|
|
|
|
|
|
-def command(fun, name=None, sortpri=0):
|
|
|
- commands[name or fun.__name__] = fun
|
|
|
- fun.sortpri = sortpri
|
|
|
- return fun
|
|
|
+def command(*args, **kwargs):
|
|
|
+
|
|
|
+ def _register(fun):
|
|
|
+ commands[kwargs.get('name') or fun.__name__] = fun
|
|
|
+ return fun
|
|
|
+
|
|
|
+ return _register(args[0]) if args else _register
|
|
|
|
|
|
|
|
|
def load_extension_commands(namespace='celery.commands'):
|
|
@@ -216,6 +219,7 @@ class Delegate(Command):
|
|
|
return self.target.run(*args, **kwargs)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class multi(Command):
|
|
|
"""Start multiple worker instances."""
|
|
|
|
|
@@ -225,9 +229,9 @@ class multi(Command):
|
|
|
def run_from_argv(self, prog_name, argv):
|
|
|
from celery.bin.celeryd_multi import MultiTool
|
|
|
return MultiTool().execute_from_commandline(argv, prog_name)
|
|
|
-multi = command(multi)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class worker(Delegate):
|
|
|
"""Start worker instance.
|
|
|
|
|
@@ -242,9 +246,9 @@ class worker(Delegate):
|
|
|
celery worker --autoscale=10,0
|
|
|
"""
|
|
|
Command = 'celery.bin.celeryd:WorkerCommand'
|
|
|
-worker = command(worker, sortpri=01)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class events(Delegate):
|
|
|
"""Event-stream utilities.
|
|
|
|
|
@@ -265,9 +269,9 @@ class events(Delegate):
|
|
|
celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
|
|
|
"""
|
|
|
Command = 'celery.bin.celeryev:EvCommand'
|
|
|
-events = command(events, sortpri=10)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class beat(Delegate):
|
|
|
"""Start the celerybeat periodic task scheduler.
|
|
|
|
|
@@ -279,9 +283,9 @@ class beat(Delegate):
|
|
|
|
|
|
"""
|
|
|
Command = 'celery.bin.celerybeat:BeatCommand'
|
|
|
-beat = command(beat, sortpri=20)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class amqp(Delegate):
|
|
|
"""AMQP Administration Shell.
|
|
|
|
|
@@ -300,9 +304,9 @@ class amqp(Delegate):
|
|
|
|
|
|
"""
|
|
|
Command = 'celery.bin.camqadm:AMQPAdminCommand'
|
|
|
-amqp = command(amqp, sortpri=30)
|
|
|
|
|
|
|
|
|
+@command(name='list')
|
|
|
class list_(Command):
|
|
|
"""Get info from broker.
|
|
|
|
|
@@ -337,9 +341,9 @@ class list_(Command):
|
|
|
with self.app.connection() as conn:
|
|
|
self.app.amqp.TaskConsumer(conn).declare()
|
|
|
topics[what](conn.manager)
|
|
|
-list_ = command(list_, 'list')
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class call(Command):
|
|
|
"""Call a task by name.
|
|
|
|
|
@@ -393,9 +397,9 @@ class call(Command):
|
|
|
eta=maybe_iso8601(kw.get('eta')),
|
|
|
expires=expires)
|
|
|
self.out(res.id)
|
|
|
-call = command(call)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class purge(Command):
|
|
|
"""Erase all messages from all known task queues.
|
|
|
|
|
@@ -413,9 +417,9 @@ class purge(Command):
|
|
|
mnum=messages, qnum=queues,
|
|
|
messages=text.pluralize(messages, 'message'),
|
|
|
queues=text.pluralize(queues, 'queue')))
|
|
|
-purge = command(purge)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class result(Command):
|
|
|
"""Gives the return value for a given task id.
|
|
|
|
|
@@ -446,7 +450,6 @@ class result(Command):
|
|
|
else:
|
|
|
value = result.get()
|
|
|
self.out(self.prettify(value)[1])
|
|
|
-result = command(result)
|
|
|
|
|
|
|
|
|
class _RemoteControl(Command):
|
|
@@ -540,6 +543,7 @@ class _RemoteControl(Command):
|
|
|
self.out(body)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class inspect(_RemoteControl):
|
|
|
"""Inspect the worker at runtime.
|
|
|
|
|
@@ -568,9 +572,9 @@ class inspect(_RemoteControl):
|
|
|
def call(self, method, *args, **options):
|
|
|
i = self.app.control.inspect(**options)
|
|
|
return getattr(i, method)(*args)
|
|
|
-inspect = command(inspect)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class control(_RemoteControl):
|
|
|
"""Workers remote control.
|
|
|
|
|
@@ -635,9 +639,9 @@ class control(_RemoteControl):
|
|
|
def cancel_consumer(self, method, queue, **kwargs):
|
|
|
"""<queue>"""
|
|
|
return self.call(method, queue, reply=True, **kwargs)
|
|
|
-control = command(control)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class status(Command):
|
|
|
"""Show list of workers that are online."""
|
|
|
option_list = inspect.option_list
|
|
@@ -655,9 +659,9 @@ class status(Command):
|
|
|
if not kwargs.get('quiet', False):
|
|
|
self.out('\n{0} {1} online.'.format(
|
|
|
nodecount, text.pluralize(nodecount, 'node')))
|
|
|
-status = command(status)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class migrate(Command):
|
|
|
"""Migrate tasks from one broker to another.
|
|
|
|
|
@@ -699,9 +703,9 @@ class migrate(Command):
|
|
|
Connection(args[1]),
|
|
|
callback=self.on_migrate_task,
|
|
|
**kwargs)
|
|
|
-migrate = command(migrate)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class shell(Command): # pragma: no cover
|
|
|
"""Start shell session with convenient access to celery symbols.
|
|
|
|
|
@@ -811,9 +815,8 @@ class shell(Command): # pragma: no cover
|
|
|
import bpython
|
|
|
bpython.embed(self.locals)
|
|
|
|
|
|
-shell = command(shell)
|
|
|
-
|
|
|
|
|
|
+@command
|
|
|
class help(Command):
|
|
|
"""Show help screen and exit."""
|
|
|
|
|
@@ -826,16 +829,15 @@ class help(Command):
|
|
|
commands=CeleryCommand.list_commands()))
|
|
|
|
|
|
return EX_USAGE
|
|
|
-help = command(help)
|
|
|
|
|
|
|
|
|
+@command
|
|
|
class report(Command):
|
|
|
"""Shows information useful to include in bugreports."""
|
|
|
|
|
|
def run(self, *args, **kwargs):
|
|
|
self.out(self.app.bugreport())
|
|
|
return EX_OK
|
|
|
-report = command(report)
|
|
|
|
|
|
|
|
|
class CeleryCommand(BaseCommand):
|