Преглед на файлове

Keep all queues for routing purposes when selecting queue subset for consumng. Closes #319. Thanks to marcinn

Ask Solem преди 14 години
родител
ревизия
9b33d52043
променени са 4 файла, в които са добавени 23 реда и са изтрити 20 реда
  1. 17 8
      celery/app/amqp.py
  2. 2 3
      celery/apps/worker.py
  3. 3 6
      celery/worker/__init__.py
  4. 1 3
      celery/worker/consumer.py

+ 17 - 8
celery/app/amqp.py

@@ -55,6 +55,9 @@ class Queues(UserDict):
     :param queues: Initial mapping.
 
     """
+    #: If set, this is a subset of queues to consume from.
+    #: The rest of the queues are then used for routing only.
+    _consume_from = None
 
     def __init__(self, queues):
         self.data = {}
@@ -87,9 +90,12 @@ class Queues(UserDict):
 
     def format(self, indent=0, indent_first=True):
         """Format routing table into string for log dumps."""
+        queues = self
+        if self._consume_from is not None:
+            queues = self._consume_from
         info = [QUEUE_FORMAT.strip() % dict(
                     name=(name + ":").ljust(12), **config)
-                            for name, config in self.items()]
+                        for name, config in sorted(queues.items())]
         if indent_first:
             return textindent("\n".join(info), indent)
         return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
@@ -116,8 +122,8 @@ class Queues(UserDict):
                     raise
                 options = self.options(queue, queue)
             acc[queue] = options
-        self.data.clear()
-        self.data.update(acc)
+        self._consume_from = acc
+        self.update(acc)
 
     @classmethod
     def with_defaults(cls, queues, default_exchange, default_exchange_type):
@@ -130,6 +136,12 @@ class Queues(UserDict):
             opts.setdefault("routing_key", opts.get("binding_key"))
         return cls(queues)
 
+    @property
+    def consume_from(self):
+        if self._consume_from is not None:
+            return self._consume_from
+        return self
+
 
 class TaskPublisher(messaging.Publisher):
     auto_declare = True
@@ -321,7 +333,8 @@ class AMQP(object):
     def get_task_consumer(self, connection, queues=None, **kwargs):
         """Return consumer configured to consume from all known task
         queues."""
-        return self.ConsumerSet(connection, from_dict=queues or self.queues,
+        return self.ConsumerSet(connection,
+                                from_dict=queues or self.queues.consume_from,
                                 **kwargs)
 
     def get_default_queue(self):
@@ -335,10 +348,6 @@ class AMQP(object):
         """Queue name⇒ declaration mapping."""
         return self.Queues(self.app.conf.CELERY_QUEUES)
 
-    @queues.setter
-    def queues(self, value):
-        return self.Queues(value)
-
     @property
     def routes(self):
         if self._rtable is None:

+ 2 - 3
celery/apps/worker.py

@@ -151,7 +151,6 @@ class Worker(object):
                     "automatically declare unknown queues you have to "
                     "enable CELERY_CREATE_MISSING_QUEUES" % (
                         self.use_queues, exc))
-        self.queues = self.app.amqp.queues
 
     def init_loader(self):
         self.loader = self.app.loader
@@ -194,6 +193,7 @@ class Worker(object):
         return ""
 
     def startup_info(self):
+        app = self.app
         concurrency = self.concurrency
         if self.autoscale:
             cmax, cmin = self.autoscale
@@ -208,7 +208,7 @@ class Worker(object):
             "celerybeat": self.run_clockservice and "ON" or "OFF",
             "events": self.events and "ON" or "OFF",
             "loader": get_full_cls_name(self.loader.__class__),
-            "queues": self.queues.format(indent=18, indent_first=False),
+            "queues": app.amqp.queues.format(indent=18, indent_first=False),
         }
 
     def run_worker(self):
@@ -226,7 +226,6 @@ class Worker(object):
                                 scheduler_cls=self.scheduler_cls,
                                 send_events=self.events,
                                 db=self.db,
-                                queues=self.queues,
                                 max_tasks_per_child=self.max_tasks_per_child,
                                 task_time_limit=self.task_time_limit,
                                 task_soft_time_limit=self.task_soft_time_limit,

+ 3 - 6
celery/worker/__init__.py

@@ -104,9 +104,9 @@ class WorkController(object):
             schedule_filename=None, task_time_limit=None,
             task_soft_time_limit=None, max_tasks_per_child=None,
             pool_putlocks=None, db=None, prefetch_multiplier=None,
-            eta_scheduler_precision=None, queues=None,
-            disable_rate_limits=None, autoscale=None,
-            autoscaler_cls=None, scheduler_cls=None, app=None):
+            eta_scheduler_precision=None, disable_rate_limits=None,
+            autoscale=None, autoscaler_cls=None, scheduler_cls=None,
+            app=None):
 
         self.app = app_or_default(app)
         conf = self.app.conf
@@ -151,8 +151,6 @@ class WorkController(object):
         self.db = db or conf.CELERYD_STATE_DB
         self.disable_rate_limits = disable_rate_limits or \
                                 conf.CELERY_DISABLE_RATE_LIMITS
-        self.queues = queues
-
         self._finalize = Finalize(self, self.stop, exitpriority=1)
         self._finalize_db = None
 
@@ -225,7 +223,6 @@ class WorkController(object):
                                     init_callback=self.ready_callback,
                                     initial_prefetch_count=prefetch_count,
                                     pool=self.pool,
-                                    queues=self.queues,
                                     app=self.app)
 
         # The order is important here;

+ 1 - 3
celery/worker/consumer.py

@@ -220,7 +220,7 @@ class Consumer(object):
 
     def __init__(self, ready_queue, eta_schedule, logger,
             init_callback=noop, send_events=False, hostname=None,
-            initial_prefetch_count=2, pool=None, queues=None, app=None):
+            initial_prefetch_count=2, pool=None, app=None):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
@@ -246,7 +246,6 @@ class Consumer(object):
         conninfo = self.app.broker_connection()
         self.connection_errors = conninfo.connection_errors
         self.channel_errors = conninfo.channel_errors
-        self.queues = queues
 
     def start(self):
         """Start the consumer.
@@ -456,7 +455,6 @@ class Consumer(object):
         self.connection = self._open_connection()
         self.logger.debug("Consumer: Connection Established.")
         self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
-                                    queues=self.queues,
                                     on_decode_error=self.on_decode_error)
         # QoS: Reset prefetch window.
         self.qos = QoS(self.task_consumer,