@@ -24,7 +24,7 @@ HELP = """
---- -- - - --------- -- - -------------- --- ------------
-TIP: Type '%(prog_name)s <command> --help' for help using a specific command.
+Type '%(prog_name)s <command> --help' for help using a specific command.
commands = {}
@@ -165,6 +165,10 @@ class Command(BaseCommand):
if body and self.show_body:
+ @property
+ def description(self):
+ return self.__doc__
class Delegate(Command):
@@ -192,26 +196,87 @@ def create_delegate(name, Command):
class worker(Delegate):
+ """Start worker instance.
+ Examples:
+ celery worker --app=proj -l info
+ celery worker -A proj -l info -Q hipri,lopri
+ celery worker -A proj --concurrency=4
+ celery worker -A proj --concurrency=1000 -P eventlet
+ celery worker --autoscale=10,0
+ """
Command = "celery.bin.celeryd:WorkerCommand"
worker = command(worker, sortpri=01)
class events(Delegate):
- Command = "celery.bin.celeryd:WorkerCommand"
+ """Event-stream utilities.
+ Commands:
+ celery events --app=proj
+ start graphical monitor (requires curses)
+ celery events -d --app=proj
+ dump events to screen.
+ celery events -b amqp://
+ celery events -C <camera> [options]
+ run snapshot camera.
+ Examples:
+ celery events
+ celery events -d
+ celery events -C mod.attr -F 1.0 --detach --maxrate=100/m -l info
+ """
+ Command = "celery.bin.celeryev:EvCommand"
events = command(events, sortpri=10)
class beat(Delegate):
+ """Start the celerybeat periodic task scheduler.
+ Examples:
+ celery beat -l info
+ celery beat -s /var/run/celerybeat/schedule --detach
+ celery beat -S djcelery.schedulers.DatabaseScheduler
+ """
Command = "celery.bin.celerybeat:BeatCommand"
beat = command(beat, sortpri=20)
class amqp(Delegate):
+ """AMQP Administration Shell.
+ Also works for non-amqp transports.
+ Examples::
+ celery amqp
+ start shell mode
+ celery amqp help
+ show list of commands
+ celery amqp exchange.delete name
+ celery amqp queue.delete queue
+ celery amqp queue.delete queue yes yes
+ """
Command = "celery.bin.camqadm:AMQPAdminCommand"
amqp = command(amqp, sortpri=30)
class list_(Command):
+ """Get info from broker.
+ Examples:
+ celery list bindings
+ NOTE: For RabbitMQ the management plugin is required.
+ """
args = "[bindings]"
def list_bindings(self, management):
@@ -242,17 +307,25 @@ list_ = command(list_, "list")
class apply(Command):
+ """Apply a task by name.
+ Examples:
+ celery apply tasks.add --args='[2, 2]'
+ celery apply tasks.add --args='[2, 2]' --countdown=10
+ """
args = "<task_name>"
option_list = Command.option_list + (
- Option("--args", "-a"),
- Option("--kwargs", "-k"),
- Option("--eta"),
- Option("--countdown", type="int"),
- Option("--expires"),
- Option("--serializer", default="json"),
- Option("--queue"),
- Option("--exchange"),
- Option("--routing-key"),
+ Option("--args", "-a", help="positional arguments (json)."),
+ Option("--kwargs", "-k", help="keyword arguments (json)."),
+ Option("--eta", help="scheduled time (ISO-8601)."),
+ Option("--countdown", type="float",
+ help="eta in seconds from now (float/int)."),
+ Option("--expires", help="expiry time (ISO-8601/float/int)."),
+ Option("--serializer", default="json", help="defaults to json."),
+ Option("--queue", help="custom queue name."),
+ Option("--exchange", help="custom exchange name."),
+ Option("--routing-key", help="custom routing key."),
def run(self, name, *_, **kw):
@@ -290,7 +363,11 @@ apply = command(apply)
class purge(Command):
+ """Erase all messages from all known task queues.
+ WARNING: There is no undo operation for this command.
+ """
def run(self, *args, **kwargs):
queues = len(self.app.amqp.queues.keys())
messages_removed = self.app.control.purge()
@@ -305,19 +382,35 @@ purge = command(purge)
class result(Command):
+ """Gives the return value for a given task id.
+ Examples:
+ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500
+ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 -t tasks.add
+ celery result 8f511516-e2f5-4da4-9d2f-0fb83a86e500 --traceback
+ """
args = "<task_id>"
option_list = Command.option_list + (
- Option("--task", "-t"),
+ Option("--task", "-t", help="name of task (if custom backend)"),
+ Option("--traceback", action="store_true",
+ help="show traceback instead"),
def run(self, task_id, *args, **kwargs):
result_cls = self.app.AsyncResult
task = kwargs.get("task")
+ traceback = kwargs.get("traceback", False)
if task:
result_cls = self.app.tasks[task].AsyncResult
result = result_cls(task_id)
- self.out(self.prettify(result.get())[1])
+ if traceback:
+ value = result.traceback
+ else:
+ value = result.get()
+ self.out(self.prettify(value)[1])
result = command(result)
@@ -332,27 +425,37 @@ class _RemoteControl(Command):
help="Comma separated list of destination node names."))
- def get_command_info(self, command, indent=0, prefix="", color=None):
+ def get_command_info(self, command, indent=0, prefix="", color=None,
+ help=False):
+ if help:
+ help = '|' + text.indent(self.choices[command][1], indent + 4)
+ else:
+ help = None
# see if it uses args.
meth = getattr(self, command)
- return "|" + text.indent("%s%s %s" % (
- prefix, color(command), meth.__doc__), indent)
+ return text.join([
+ '|' + text.indent("%s%s %s" % (prefix, color(command),
+ meth.__doc__), indent), help,
+ ])
except AttributeError:
- return "|" + text.indent(prefix + str(color(command)), indent)
+ return text.join([
+ "|" + text.indent(prefix + str(color(command)), indent), help,
+ ])
- def list_commands(self, indent=0, prefix="", color=None):
+ def list_commands(self, indent=0, prefix="", color=None, help=False):
color = color if color else lambda x: x
prefix = prefix + " " if prefix else ""
- return "\n".join(self.get_command_info(c, indent, prefix, color)
+ return "\n".join(self.get_command_info(c, indent, prefix, color, help)
for c in sorted(self.choices))
def epilog(self):
return "\n".join([
- self.list_commands()
+ self.list_commands(indent=4, help=True)
def usage(self, command):
@@ -375,7 +478,7 @@ class _RemoteControl(Command):
raise Error("Unknown %s method %s" % (self.name, method))
destination = kwargs.get("destination")
- timeout = kwargs.get("timeout") or self.choices[method]
+ timeout = kwargs.get("timeout") or self.choices[method][0]
if destination and isinstance(destination, basestring):
destination = map(str.strip, destination.split(","))
@@ -403,16 +506,29 @@ class _RemoteControl(Command):
class inspect(_RemoteControl):
+ """Inspect the worker at runtime.
+ Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
+ Examples:
+ celery inspect active --timeout=5
+ celery inspect scheduled -d worker1.example.com
+ celery inspect revoked -d w1.e.com,w2.e.com
+ """
name = "inspect"
- choices = {"active": 1.0,
- "active_queues": 1.0,
- "scheduled": 1.0,
- "reserved": 1.0,
- "stats": 1.0,
- "revoked": 1.0,
- "registered": 1.0,
- "ping": 0.2,
- "report": 1.0}
+ choices = {
+ "active": (1.0, "dump active tasks (being processed)"),
+ "active_queues": (1.0, "dump queues being consumed from"),
+ "scheduled": (1.0, "dump scheduled tasks (eta/countdown/retry)"),
+ "reserved": (1.0, "dump reserved tasks (waiting to be processed)"),
+ "stats": (1.0, "dump worker statistics"),
+ "revoked": (1.0, "dump of revoked task ids"),
+ "registered": (1.0, "dump of registered tasks"),
+ "ping": (0.2, "ping worker(s)"),
+ "report": (1.0, "get bugreport info")
+ }
def call(self, method, *args, **options):
i = self.app.control.inspect(**options)
@@ -421,16 +537,36 @@ inspect = command(inspect)
class control(_RemoteControl):
+ """Workers remote control.
+ Availability: RabbitMQ (amqp), Redis, and MongoDB transports.
+ Examples:
+ celery control enable_events --timeout=5
+ celery control -d worker1.example.com enable_events
+ celery control -d w1.e.com,w2.e.com enable_events
+ celery control -d w1.e.com add_consumer queue_name
+ celery control -d w1.e.com cancel_consumer queue_name
+ celery control -d w1.e.com add_consumer queue exchange direct rkey
+ """
name = "control"
- choices = {"enable_events": 1.0,
- "disable_events": 1.0,
- "add_consumer": 1.0,
- "cancel_consumer": 1.0,
- "rate_limit": 1.0,
- "time_limit": 1.0,
- "autoscale": 1.0,
- "pool_grow": 1.0,
- "pool_shrink": 1.0}
+ choices = {
+ "enable_events": (1.0, "tell worker(s) to enable events"),
+ "disable_events": (1.0, "tell worker(s) to disable events"),
+ "add_consumer": (1.0, "tell worker(s) to start consuming a queue"),
+ "cancel_consumer": (1.0, "tell worker(s) to stop consuming a queue"),
+ "rate_limit": (1.0,
+ "tell worker(s) to modify the rate limit for a task type"),
+ "time_limit": (1.0,
+ "tell worker(s) to modify the time limit for a task type."),
+ "autoscale": (1.0, "change autoscale settings"),
+ "pool_grow": (1.0, "start more pool processes"),
+ "pool_shrink": (1.0, "use less pool processes"),
+ }
def call(self, method, *args, **options):
return getattr(self.app.control, method)(*args, reply=True, **options)
@@ -448,7 +584,7 @@ class control(_RemoteControl):
return self.call(method, max, min, **kwargs)
def rate_limit(self, method, task_name, rate_limit, **kwargs):
- """<task_name> <rate_limit (e.g. 5/s | 5/m | 5/h)>"""
+ """<task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>"""
return self.call(method, task_name, rate_limit, **kwargs)
def time_limit(self, method, task_name, soft, hard=None, **kwargs):
@@ -468,6 +604,7 @@ control = command(control)
class status(Command):
+ """Show list of workers that are online."""
option_list = inspect.option_list
def run(self, *args, **kwargs):
@@ -486,7 +623,16 @@ status = command(status)
class migrate(Command):
+ """Migrate tasks from one broker to another.
+ Examples:
+ celery migrate redis://localhost amqp://guest@localhost//
+ celery migrate django:// redis://localhost
+ NOTE: This command is experimental, make sure you have
+ a backup of the tasks before you continue.
+ """
def usage(self, command):
return "%%prog %s <source_url> <dest_url>" % (command, )
@@ -507,21 +653,41 @@ migrate = command(migrate)
class shell(Command): # pragma: no cover
+ """Start shell session with convenient access to celery symbols.
+ The following symbols will be added to the main globals:
+ - celery: the current application.
+ - chord, group, chain, chunks, xmap, xstarmap
+ subtask, Task
+ - all registered tasks.
+ Example Session:
+ $ celery shell
+ >>> celery
+ <Celery default:0x1012d9fd0>
+ >>> add
+ <@task: tasks.add>
+ >>> add.delay(2, 2)
+ <AsyncResult: 537b48c7-d6d3-427a-a24a-d1b4414035be>
+ """
option_list = Command.option_list + (
Option("--ipython", "-I",
action="store_true", dest="force_ipython",
- help="Force IPython."),
+ help="force iPython."),
Option("--bpython", "-B",
action="store_true", dest="force_bpython",
- help="Force bpython."),
+ help="force bpython."),
Option("--python", "-P",
action="store_true", dest="force_python",
- help="Force default Python shell."),
+ help="force default Python shell."),
Option("--without-tasks", "-T", action="store_true",
- help="Don't add tasks to locals."),
+ help="don't add tasks to locals."),
Option("--eventlet", action="store_true",
- help="Use eventlet."),
- Option("--gevent", action="store_true", help="Use gevent."),
+ help="use eventlet."),
+ Option("--gevent", action="store_true", help="use gevent."),
def run(self, force_ipython=False, force_bpython=False,
@@ -599,6 +765,7 @@ shell = command(shell)
class help(Command):
+ """Show help screen and exit."""
def usage(self, command):
return "%%prog <command> [options] %s" % (self.args, )
@@ -613,6 +780,7 @@ help = command(help)
class report(Command):
+ """Shows information useful to include in bugreports."""
def run(self, *args, **kwargs):
@@ -671,8 +839,12 @@ class CeleryCommand(BaseCommand):
colored = term.colored().names[color] if color else lambda x: x
obj = self.commands[command]
if obj.leaf:
- return "|" + text.indent("celery %s" % colored(command), indent)
- return obj.list_commands(indent, "celery %s" % command, colored)
+ return '|' + text.indent("celery %s" % colored(command), indent)
+ return text.join([
+ " ",
+ '|' + text.indent("celery %s --help" % colored(command), indent),
+ obj.list_commands(indent, "celery %s" % command, colored),
+ ])
def list_commands(self, indent=0):