Browse Source

Addded celeryctl purge command

Ask Solem 14 years ago
parent
commit
2683fc6b7c
1 changed files with 22 additions and 1 deletions
  1. 22 1
      celery/bin/celeryctl.py

+ 22 - 1
celery/bin/celeryctl.py

@@ -7,7 +7,7 @@ from textwrap import wrap
 from anyjson import deserialize
 
 from celery import __version__
-from celery.app import app_or_default
+from celery.app import app_or_default, current_app
 from celery.bin.base import Command as CeleryCommand
 from celery.utils import term
 
@@ -148,6 +148,27 @@ class apply(Command):
 apply = command(apply)
 
 
+def pluralize(n, text, suffix='s'):
+    if n > 1:
+        return text + suffix
+    return text
+
+
+class purge(Command):
+
+    def run(self, *args, **kwargs):
+        app = current_app()
+        queues = len(app.amqp.queues.keys())
+        messages_removed = app.control.discard_all()
+        if messages_removed:
+            self.out("Purged %s %s from %s known task %s." % (
+                messages_removed, pluralize(messages_removed, "message"),
+                queues, pluralize(queues, "queue")))
+        else:
+            self.out("No messages purged from %s known %s" % (
+                queues, pluralize(queues, "queue")))
+purge = command(purge)
+
 class result(Command):
     args = "<task_id>"
     option_list = Command.option_list + (