Selaa lähdekoodia

[Commands] Implements -Q and -X options for celery purge (Issue #2321)

Ask Solem 9 vuotta sitten
vanhempi
commit
6f89517a01

+ 29 - 11
celery/bin/celery.py

@@ -270,11 +270,12 @@ from importlib import import_module
 from kombu.utils import json
 
 from celery.app import defaults
-from celery.five import string_t, values
+from celery.five import keys, string_t, values
 from celery.platforms import EX_OK, EX_FAILURE, EX_UNAVAILABLE, EX_USAGE
 from celery.utils import term
 from celery.utils import text
 from celery.utils.functional import pass1
+from celery.utils.text import str_to_list
 from celery.utils.timeutils import maybe_iso8601
 
 # Cannot use relative imports here due to a Windows issue (#1111).
@@ -462,25 +463,42 @@ class purge(Command):
     option_list = Command.option_list + (
         Option('--force', '-f', action='store_true',
                help='Do not prompt for verification'),
+        Option('--queues', '-Q', default=[],
+               help='Comma separated list of queue names to purge.'),
+        Option('--exclude-queues', '-X', default=[],
+               help='Comma separated list of queues names not to purge.')
     )
 
-    def run(self, force=False, **kwargs):
-        names = list(sorted(self.app.amqp.queues.keys()))
+    def run(self, force=False, queues=None, exclude_queues=None, **kwargs):
+        queues = set(str_to_list(queues or []))
+        exclude = set(str_to_list(exclude_queues or []))
+        names = (queues or set(keys(self.app.amqp.queues))) - exclude
         qnum = len(names)
-        if not force:
-            self.out(self.warn_prelude.format(
-                warning=self.colored.red('WARNING'),
-                queues=text.pluralize(qnum, 'queue'), names=', '.join(names),
-            ))
-            if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
-                return
-        messages = self.app.control.purge()
+
+        messages = None
+        if names:
+            if not force:
+                self.out(self.warn_prelude.format(
+                    warning=self.colored.red('WARNING'),
+                    queues=text.pluralize(qnum, 'queue'),
+                    names=', '.join(sorted(names)),
+                ))
+                if self.ask(self.warn_prompt, ('yes', 'no'), 'no') != 'yes':
+                    return
+            with self.app.connection_for_write() as conn:
+                messages = sum(self._purge(conn, queue) for queue in names)
         fmt = self.fmt_purged if messages else self.fmt_empty
         self.out(fmt.format(
             mnum=messages, qnum=qnum,
             messages=text.pluralize(messages, 'message'),
             queues=text.pluralize(qnum, 'queue')))
 
+    def _purge(self, conn, queue):
+        try:
+            return conn.default_channel.queue_purge(queue) or 0
+        except conn.channel_errors:
+            return 0
+
 
 class result(Command):
     """Gives the return value for a given task id.

+ 4 - 4
celery/tests/bin/test_celery.py

@@ -169,15 +169,15 @@ class test_call(AppCase):
 
 class test_purge(AppCase):
 
-    @patch('celery.app.control.Control.purge')
-    def test_run(self, purge_):
+    def test_run(self):
         out = WhateverIO()
         a = purge(app=self.app, stdout=out)
-        purge_.return_value = 0
+        a._purge = Mock(name='_purge')
+        a._purge.return_value = 0
         a.run(force=True)
         self.assertIn('No messages purged', out.getvalue())
 
-        purge_.return_value = 100
+        a._purge.return_value = 100
         a.run(force=True)
         self.assertIn('100 messages', out.getvalue())
 

+ 10 - 2
celery/utils/text.py

@@ -12,9 +12,17 @@ from textwrap import fill
 
 from pprint import pformat
 
+from celery.five import string_t
+
 __all__ = ['dedent_initial', 'dedent', 'fill_paragraphs', 'join',
            'ensure_2lines', 'abbr', 'abbrtask', 'indent', 'truncate',
-           'pluralize', 'pretty']
+           'pluralize', 'pretty', 'str_to_list']
+
+
+def str_to_list(s):
+    if isinstance(s, string_t):
+        return s.split(',')
+    return s
 
 
 def dedent_initial(s, n=4):
@@ -76,7 +84,7 @@ def truncate_bytes(s, maxlen=128, suffix=b'...'):
 
 
 def pluralize(n, text, suffix='s'):
-    if n > 1:
+    if n != 1:
         return text + suffix
     return text
 

+ 2 - 7
celery/worker/__init__.py

@@ -30,11 +30,12 @@ from celery import signals
 from celery.exceptions import (
     ImproperlyConfigured, WorkerTerminate, TaskRevokedError,
 )
-from celery.five import python_2_unicode_compatible, string_t, values
+from celery.five import python_2_unicode_compatible, values
 from celery.platforms import EX_FAILURE, create_pidlock
 from celery.utils import default_nodename, worker_direct
 from celery.utils.imports import reload_from_cwd
 from celery.utils.log import mlevel, worker_logger as logger
+from celery.utils.text import str_to_list
 from celery.utils.threads import default_socket_timeout
 
 from . import state
@@ -58,12 +59,6 @@ defined in the `task_queues` setting.
 """
 
 
-def str_to_list(s):
-    if isinstance(s, string_t):
-        return s.split(',')
-    return s
-
-
 @python_2_unicode_compatible
 class WorkController(object):
     """Unmanaged worker instance."""

+ 15 - 0
docs/userguide/monitoring.rst

@@ -73,6 +73,9 @@ Commands
 
 * **purge**: Purge messages from all configured task queues.
 
+    This command will remove all messages from queues configured in
+    the :setting:`CELERY_QUEUES` setting:
+
     .. warning::
         There is no undo for this operation, and messages will
         be permanently deleted!
@@ -82,6 +85,18 @@ Commands
         $ celery -A proj purge
 
 
+    You can also specify the queues to purge using the `-Q` option:
+
+    .. code-block:: console
+
+        $ celery -A proj purge -Q celery,foo,bar
+
+    and exclude queues from being purged using the `-X` option:
+
+    .. code-block:: console
+
+        $ celery -A proj purge -X celery
+
 * **inspect active**: List active tasks
 
     .. code-block:: console