Explorar o código

BroadcastConsumer: Needed unique queue name to be broadcast, so queue name generated using the current hostname.

Ask Solem %!s(int64=15) %!d(string=hai) anos
pai
achega
1da4812289
Modificáronse 2 ficheiros con 8 adicións e 1 borrados
  1. 6 0
      celery/messaging.py
  2. 2 1
      celery/worker/listener.py

+ 6 - 0
celery/messaging.py

@@ -3,6 +3,7 @@
 Sending and Receiving Messages
 Sending and Receiving Messages
 
 
 """
 """
+import socket
 
 
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 from carrot.connection import DjangoBrokerConnection, AMQPConnectionException
 from carrot.messaging import Publisher, Consumer, ConsumerSet
 from carrot.messaging import Publisher, Consumer, ConsumerSet
@@ -98,6 +99,11 @@ class BroadcastConsumer(Consumer):
     exchange_type = conf.BROADCAST_EXCHANGE_TYPE
     exchange_type = conf.BROADCAST_EXCHANGE_TYPE
     no_ack = True
     no_ack = True
 
 
+    def __init__(self, *args, **kwargs):
+        hostname = kwargs.pop("hostname", None) or socket.gethostname()
+        self.queue = "%s_%s" % (self.queue, hostname)
+        super(BroadcastConsumer, self).__init__(*args, **kwargs)
+
 
 
 def establish_connection(connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
 def establish_connection(connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
     """Establish a connection to the message broker."""
     """Establish a connection to the message broker."""

+ 2 - 1
celery/worker/listener.py

@@ -178,7 +178,8 @@ class CarrotListener(object):
         self.connection = self._open_connection()
         self.connection = self._open_connection()
         self.logger.debug("CarrotListener: Connection Established.")
         self.logger.debug("CarrotListener: Connection Established.")
         self.task_consumer = get_consumer_set(connection=self.connection)
         self.task_consumer = get_consumer_set(connection=self.connection)
-        self.broadcast_consumer = BroadcastConsumer(self.connection)
+        self.broadcast_consumer = BroadcastConsumer(self.connection,
+                                                    hostname=self.hostname)
         self.task_consumer.register_callback(self.receive_message)
         self.task_consumer.register_callback(self.receive_message)
         self.event_dispatcher = EventDispatcher(self.connection,
         self.event_dispatcher = EventDispatcher(self.connection,
                                                 enabled=self.send_events)
                                                 enabled=self.send_events)