浏览代码

This is not the way to do it and it's incomplete. however have to switch branches now.

Ask Solem 16 年之前
父节点
当前提交
1dbc164191
共有 2 个文件被更改,包括 47 次插入13 次删除
  1. 2 3
      bin/celeryctl
  2. 45 10
      celery/bin/celeryctl.py

+ 2 - 3
bin/celeryctl

@@ -1,10 +1,9 @@
 #!/usr/bin/env python
 #!/usr/bin/env python
 import sys
 import sys
-from celery.bin.celeryctl import celery_control, parse_options
+from celery.bin.celeryctl import execute_from_command_line
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
-    options, values = parse_options(sys.argv[1:])
-    celery_control(*values, **dict(options))
+	execute_from_command_line(sys.argv)
 
 
     
     
 
 

+ 45 - 10
celery/bin/celeryctl.py

@@ -5,18 +5,53 @@ django_project_dir = os.environ.get("DJANGO_PROJECT_DIR")
 if django_project_dir:
 if django_project_dir:
     sys.path.append(django_project_dir)
     sys.path.append(django_project_dir)
 from django.conf import settings
 from django.conf import settings
-import optparse
+from optparse import OptionParser, make_option
+from django.core.management.base import BaseCommand
+from django.core.management import LaxOptionParser, ManagementUtility
 
 
-OPTION_LIST = ()
 
 
+class DiscardCommand(BaseCommand):
+    """Discard all messages waiting in the task queue."""
+    option_list = BaseCommand.option_list + (
+            make_option('--error-empty', action="store_true",
+            help="Exit with errorcode if the queue is empty."),
+    )
+    help = "Discard all messages waiting in the task queue."
 
 
-def parse_options(arguments):
-    """Parse the available options to ``celeryctl``."""
-    parser = optparse.OptionParser(option_list=OPTION_LIST)
-    options, values = parser.parse_args(arguments)
-    return options, values
+    def handle(self, *args, **kwargs):
+        from celery.task import discard_all
+        error_empty = kwargs.get("error_empty", False)
+        discarded = discard_all()
+        if not discarded:
+            sys.stderr.write("No tasks deleted: the queue is empty.\n")
+            if error_empty:
+                sys.exit(255)
+        else:
+            print("Deleted %d task(s) from the queue." % discarded)
 
 
 
 
-def celery_control(*args, **kwargs):
-    print("ARGS: %s" % args)
-    print("KWARGS: %s" % kwargs)
+COMMANDS = {
+    "discard": DiscardCommand(),
+}
+
+class CeleryUtility(ManagementUtility):
+
+    def __init__(self, *args, **kwargs):
+        super(CeleryUtility, self).__init__(*args, **kwargs)
+        self.is_manage_py = "manage" in self.prog_name
+        if self.is_manage_py:
+            self.prog_name = "%s celeryctl" % self.prog_name
+
+    def fetch_command(self, subcommand):
+        """Tries to fetch the given subcommand."""
+        if subcommand not in COMMANDS:
+            sys.stderr.write(
+                "Unknown command: %r\nType '%s help' for usage\n" % (
+                    subcommand, self.prog_name))
+            sys.exit(1)
+        return COMMANDS[subcommand]
+
+
+def execute_from_command_line(argv=None):
+    utility = CeleryUtility(argv)
+    utility.execute()