|
@@ -4,6 +4,7 @@ from celery import log
|
|
|
from celery.registry import tasks
|
|
|
from celery.worker.revoke import revoked
|
|
|
|
|
|
+TASK_INFO_FIELDS = ("exchange", "routing_key", "rate_limit")
|
|
|
|
|
|
def expose(fun):
|
|
|
"""Expose method as a celery worker control command, allowed to be called
|
|
@@ -56,6 +57,25 @@ class Control(object):
|
|
|
self.logger.critical("Got shutdown from remote.")
|
|
|
raise SystemExit
|
|
|
|
|
|
+ @expose
|
|
|
+ def dump_tasks(self, **kwargs):
|
|
|
+ from celery import registry
|
|
|
+
|
|
|
+ def _extract_info(task):
|
|
|
+ fields = dict((field, str(getattr(task, field, None)))
|
|
|
+ for field in TASK_INFO_FIELDS
|
|
|
+ if getattr(task, field, None) is not None)
|
|
|
+ info = map("=".join, fields.items())
|
|
|
+ if not info:
|
|
|
+ return "\t%s" % task.name
|
|
|
+ return "\t%s [%s]" % (task.name, " ".join(info))
|
|
|
+
|
|
|
+ tasks = sorted(registry.tasks.keys())
|
|
|
+ tasks = [registry.tasks[task] for task in tasks]
|
|
|
+
|
|
|
+ self.logger.warn("* Dump of currently registered tasks:\n%s" % (
|
|
|
+ "\n".join(map(_extract_info, tasks))))
|
|
|
+
|
|
|
|
|
|
class ControlDispatch(object):
|
|
|
"""Execute worker control panel commands."""
|