Browse Source

The celer purge command now warns and promopts for confirmation (can be disabled using new -f option)

Ask Solem 11 years ago
parent
commit
6b94ceb4f1
3 changed files with 56 additions and 5 deletions
  1. 34 0
      celery/bin/base.py
  2. 22 4
      celery/bin/celery.py
  3. 0 1
      celery/exceptions.py

+ 34 - 0
celery/bin/base.py

@@ -89,6 +89,11 @@ from celery.utils import text
 from celery.utils import NODENAME_DEFAULT, nodesplit
 from celery.utils.imports import symbol_by_name, import_from_cwd
 
+try:
+    input = raw_input
+except NameError:
+    pass
+
 # always enable DeprecationWarnings, so our users can see them.
 for warning in (CDeprecationWarning, CPendingDeprecationWarning):
     warnings.simplefilter('once', warning, 0)
@@ -325,6 +330,34 @@ class Command(object):
             return os.path.expanduser(value)
         return value
 
+    def ask(self, q, choices, default=None):
+        """Prompt user to choose from a tuple of string values.
+
+        :param q: the question to ask (do not include questionark)
+        :param choice: tuple of possible choices, must be lowercase.
+        :param default: Default value if any.
+
+        If a default is not specified the question will be repeated
+        until the user gives a valid choice.
+
+        Matching is done case insensitively.
+
+        """
+        schoices = choices
+        if default is not None:
+            schoices = [c.upper() if c == default else c.lower()
+                        for c in choices]
+        schoices = '/'.join(schoices)
+
+        p = '{0} ({1})? '.format(q.capitalize(), schoices)
+        while 1:
+            val = input(p).lower()
+            if val in choices:
+                return val
+            elif default is not None:
+                break
+        return default
+
     def handle_argv(self, prog_name, argv, command=None):
         """Parse command-line arguments from ``argv`` and dispatch
         to :meth:`run`.
@@ -615,6 +648,7 @@ class Command(object):
         if self._colored is not None:
             self._colored.enabled = not self._no_color
 
+
 def daemon_options(default_pidfile=None, default_logfile=None):
     return (
         Option('-f', '--logfile', default=default_logfile),

+ 22 - 4
celery/bin/celery.py

@@ -196,17 +196,35 @@ class purge(Command):
     WARNING: There is no undo operation for this command.
 
     """
+    warn_prelude = (
+        '{warning}: This will remove all tasks from {queues}: {names}.\n'
+        '         There is no undo for this operation!\n\n'
+        '(to skip this prompt use the -f option)\n'
+    )
+    warn_prompt = 'Are you sure you want to delete all tasks'
     fmt_purged = 'Purged {mnum} {messages} from {qnum} known task {queues}.'
     fmt_empty = 'No messages purged from {qnum} {queues}'
+    option_list = Command.option_list + (
+        Option('--force', '-f', action='store_true',
+               help='Do not prompt for verification'),
+    )
 
-    def run(self, *args, **kwargs):
-        queues = len(self.app.amqp.queues)
+    def run(self, force=False, **kwargs):
+        names = list(sorted(self.app.amqp.queues.keys()))
+        qnum = len(names)
+        if not force:
+            self.out(self.warn_prelude.format(
+                warning=self.colored.red('WARNING'),
+                queues=text.pluralize(qnum, 'queue'), names=', '.join(names),
+            ))
+            if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
+                return
         messages = self.app.control.purge()
         fmt = self.fmt_purged if messages else self.fmt_empty
         self.out(fmt.format(
-            mnum=messages, qnum=queues,
+            mnum=messages, qnum=qnum,
             messages=text.pluralize(messages, 'message'),
-            queues=text.pluralize(queues, 'queue')))
+            queues=text.pluralize(qnum, 'queue')))
 
 
 class result(Command):

+ 0 - 1
celery/exceptions.py

@@ -62,7 +62,6 @@ class WorkerShutdown(SystemExit):
     """Signals that the worker should perform a warm shutdown."""
 
 
-
 class QueueNotFound(KeyError):
     """Task routed to a queue not in CELERY_QUEUES."""