Browse Source

Fix for QoS when using RabbitMQ 3.3 or later. (Issue celery/kombu#339)

Ask Solem 11 years ago
parent
commit
ae04f684e9
1 changed files with 20 additions and 2 deletions
  1. 20 2
      celery/worker/consumer.py

+ 20 - 2
celery/worker/consumer.py

@@ -592,11 +592,29 @@ class Tasks(bootsteps.StartStopStep):
 
     def start(self, c):
         c.update_strategies()
+
+        # - RabbitMQ 3.3 completely redefines how basic_qos works..
+        # This will detect if the new qos smenatics is in effect,
+        # and if so make sure the 'apply_global' flag is set on qos updates.
+        qos_global = not (
+            c.connection.transport.qos_semantics_matches_spec(
+                c.connection.connection))
+
+        # set initial prefetch count
+        c.connection.default_channel.basic_qos(
+            0, c.initial_prefetch_count, qos_global,
+        )
+
         c.task_consumer = c.app.amqp.TaskConsumer(
             c.connection, on_decode_error=c.on_decode_error,
         )
-        c.qos = QoS(c.task_consumer.qos, c.initial_prefetch_count)
-        c.qos.update()  # set initial prefetch count
+
+        def set_prefetch_count(prefetch_count):
+            return c.task_consumer.qos(
+                prefetch_count=prefetch_count,
+                apply_global=qos_global,
+            )
+        c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)
 
     def stop(self, c):
         if c.task_consumer: