Kaynağa Gözat

if using amqplib the listener now uses the new "wait-on-multiple-channel" feature in carrot, so it can receive broadcast messages as quickly as possible.

Ask Solem 15 yıl önce
ebeveyn
işleme
01c3f17e57
1 değiştirilmiş dosya ile 15 ekleme ve 2 silme
  1. 15 2
      celery/worker/listener.py

+ 15 - 2
celery/worker/listener.py

@@ -81,7 +81,7 @@ class CarrotListener(object):
         task_consumer = self.task_consumer
 
         self.logger.debug("CarrotListener: Starting message consumer...")
-        wait_for_message = task_consumer.iterconsume(limit=None).next
+        wait_for_message = self._detect_wait_method()(limit=None).next
         self.logger.debug("CarrotListener: Ready to accept tasks!")
 
         prev_pcount = None
@@ -172,7 +172,6 @@ class CarrotListener(object):
         self.logger.debug("CarrotListener: Connection Established.")
         self.task_consumer = get_consumer_set(connection=self.amqp_connection)
         self.broadcast_consumer = BroadcastConsumer(self.amqp_connection)
-        self.task_consumer.add_consumer(self.broadcast_consumer)
         self.task_consumer.register_callback(self.receive_message)
         self.event_dispatcher = EventDispatcher(self.amqp_connection,
                                                 enabled=self.send_events)
@@ -181,6 +180,20 @@ class CarrotListener(object):
 
         self._state = RUN
 
+    def _amqplib_iterconsume(self, **kwargs):
+        while 1:
+            yield self.amqp_connection.connection.wait_any()
+
+    def _detect_wait_method(self):
+        if hasattr(self.amqp_connection.connection, "wait_any"):
+            self.broadcast_consumer.register_callback(self.receive_message)
+            self.task_consumer.iterconsume()
+            self.broadcast_consumer.iterconsume()
+            return self._amqplib_iterconsume
+        else:
+            self.task_consumer.add_consumer(self.broadcast_consumer)
+            return self.task_consumer.iterconsume
+
     def _open_connection(self):
         """Retries connecting to the AMQP broker over time.