Browse Source

Worker: Adds -X option to deselect queues. Closes #1399

Ask Solem 11 years ago
parent
commit
cfeb181d97

+ 25 - 10
celery/app/amqp.py

@@ -16,6 +16,7 @@ from kombu.common import Broadcast
 from kombu.pools import ProducerPool
 from kombu.utils import cached_property, uuid
 from kombu.utils.encoding import safe_repr
+from kombu.utils.functional import maybe_list
 
 from celery import signals
 from celery.five import items, string_t
@@ -141,20 +142,34 @@ class Queues(dict):
             self._consume_from[q.name] = q
         return q
 
-    def select_subset(self, wanted):
+    def select(self, include):
         """Sets :attr:`consume_from` by selecting a subset of the
         currently defined queues.
 
-        :param wanted: List of wanted queue names.
+        :param include: Names of queues to consume from.
+                        Can be iterable or string.
         """
-        if wanted:
-            self._consume_from = dict((name, self[name]) for name in wanted)
-
-    def select_remove(self, queue):
-        if self._consume_from is None:
-            self.select_subset(k for k in self if k != queue)
-        else:
-            self._consume_from.pop(queue, None)
+        if include:
+            self._consume_from = dict((name, self[name])
+                                      for name in maybe_list(include))
+    select_subset = select  # XXX compat
+
+    def deselect(self, exclude):
+        """Deselect queues so that they will not be consumed from.
+
+        :param exclude: Names of queues to avoid consuming from.
+                        Can be iterable or string.
+
+        """
+        if exclude:
+            exclude = maybe_list(exclude)
+            if self._consume_from is None:
+                # using selection
+                return self.select(k for k in self if k not in exclude)
+            # using all queues
+            for queue in exclude:
+                self._consume_from.pop(queue, None)
+    select_remove = deselect  # XXX compat
 
     def new_missing(self, name):
         return Queue(name, self.autoexchange(name), name)

+ 1 - 1
celery/app/base.py

@@ -378,7 +378,7 @@ class Celery(object):
             )
 
     def select_queues(self, queues=None):
-        return self.amqp.queues.select_subset(queues)
+        return self.amqp.queues.select(queues)
 
     def either(self, default_key, *values):
         """Fallback to the value of a configuration key if none of the

+ 1 - 0
celery/bin/worker.py

@@ -214,6 +214,7 @@ class worker(Command):
             Option('--maxtasksperchild', dest='max_tasks_per_child',
                    default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
             Option('--queues', '-Q', default=[]),
+            Option('--exclude-queues', '-X', default=[]),
             Option('--include', '-I', default=[]),
             Option('--autoscale'),
             Option('--autoreload', action='store_true'),

+ 4 - 4
celery/tests/app/test_amqp.py

@@ -172,14 +172,14 @@ class test_Queues(AppCase):
 
     def test_select_add(self):
         q = Queues()
-        q.select_subset(['foo', 'bar'])
+        q.select(['foo', 'bar'])
         q.select_add('baz')
         self.assertItemsEqual(keys(q._consume_from), ['foo', 'bar', 'baz'])
 
-    def test_select_remove(self):
+    def test_deselect(self):
         q = Queues()
-        q.select_subset(['foo', 'bar'])
-        q.select_remove('bar')
+        q.select(['foo', 'bar'])
+        q.deselect('bar')
         self.assertItemsEqual(keys(q._consume_from), ['foo'])
 
     def test_with_ha_policy_compat(self):

+ 26 - 11
celery/worker/__init__.py

@@ -41,7 +41,7 @@ from . import state
 
 __all__ = ['WorkController', 'default_nodename']
 
-UNKNOWN_QUEUE = """\
+SELECT_UNKNOWN_QUEUE = """\
 Trying to select queue subset of {0!r}, but queue {1} is not
 defined in the CELERY_QUEUES setting.
 
@@ -49,6 +49,17 @@ If you want to automatically declare unknown queues you can
 enable the CELERY_CREATE_MISSING_QUEUES setting.
 """
 
+DESELECT_UNKNOWN_QUEUE = """\
+Trying to deselect queue subset of {0!r}, but queue {1} is not
+defined in the CELERY_QUEUES setting.
+"""
+
+
+def str_to_list(s):
+    if isinstance(s, string_t):
+        return s.split(',')
+    return s
+
 
 def default_nodename(hostname):
     name, host = nodesplit(hostname or '')
@@ -94,9 +105,10 @@ class WorkController(object):
         self.setup_instance(**self.prepare_args(**kwargs))
 
     def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
-                       include=None, use_eventloop=None, **kwargs):
+                       include=None, use_eventloop=None, exclude_queues=None,
+                       **kwargs):
         self.pidfile = pidfile
-        self.setup_queues(queues)
+        self.setup_queues(queues, exclude_queues)
         self.setup_includes(include)
 
         # Set default concurrency
@@ -156,15 +168,19 @@ class WorkController(object):
         if self.pidlock:
             self.pidlock.release()
 
-    def setup_queues(self, queues):
-        if isinstance(queues, string_t):
-            queues = queues.split(',')
-        self.queues = queues
+    def setup_queues(self, include, exclude):
+        include = str_to_list(include)
+        exclude = str_to_list(exclude)
+        try:
+            self.app.amqp.queues.select(include)
+        except KeyError as exc:
+            raise ImproperlyConfigured(
+                SELECT_UNKNOWN_QUEUE.format(include, exc))
         try:
-            self.app.select_queues(queues)
+            self.app.amqp.queues.deselect(exclude)
         except KeyError as exc:
             raise ImproperlyConfigured(
-                UNKNOWN_QUEUE.format(queues, exc))
+                DESELECT_UNKNOWN_QUEUE.format(exclude, exc))
         if self.app.conf.CELERY_WORKER_DIRECT:
             self.app.amqp.queues.select_add(worker_direct(self.hostname))
 
@@ -173,8 +189,7 @@ class WorkController(object):
         # ensure all task modules are imported in case an execv happens.
         inc = self.app.conf.CELERY_INCLUDE
         if includes:
-            if isinstance(includes, string_t):
-                includes = includes.split(',')
+            includes = str_to_list(includes)
             inc = self.app.conf.CELERY_INCLUDE = tuple(inc) + tuple(includes)
         self.include = includes
         task_modules = set(task.__class__.__module__

+ 1 - 1
celery/worker/consumer.py

@@ -358,7 +358,7 @@ class Consumer(object):
             info('Started consuming from %r', queue)
 
     def cancel_task_queue(self, queue):
-        self.app.amqp.queues.select_remove(queue)
+        self.app.amqp.queues.deselect(queue)
         self.task_consumer.cancel_by_queue(queue)
 
     def apply_eta_task(self, task):