|
@@ -402,18 +402,18 @@ class Consumer(object):
|
|
Retries establishing the connection if the
|
|
Retries establishing the connection if the
|
|
:setting:`broker_connection_retry` setting is enabled
|
|
:setting:`broker_connection_retry` setting is enabled
|
|
"""
|
|
"""
|
|
- return self.connection_for_read(heartbeat=self.amqheartbeat)
|
|
|
|
|
|
+ conn = self.connection_for_read(heartbeat=self.amqheartbeat)
|
|
|
|
+ if self.hub:
|
|
|
|
+ conn.transport.register_with_event_loop(conn.connection, self.hub)
|
|
|
|
+ return conn
|
|
|
|
|
|
def connection_for_read(self, heartbeat=None):
|
|
def connection_for_read(self, heartbeat=None):
|
|
return self.ensure_connected(
|
|
return self.ensure_connected(
|
|
self.app.connection_for_read(heartbeat=heartbeat))
|
|
self.app.connection_for_read(heartbeat=heartbeat))
|
|
|
|
|
|
def connection_for_write(self, heartbeat=None):
|
|
def connection_for_write(self, heartbeat=None):
|
|
- conn = self.ensure_connected(
|
|
|
|
|
|
+ return self.ensure_connected(
|
|
self.app.connection_for_write(heartbeat=heartbeat))
|
|
self.app.connection_for_write(heartbeat=heartbeat))
|
|
- if self.hub:
|
|
|
|
- conn.transport.register_with_event_loop(conn.connection, self.hub)
|
|
|
|
- return conn
|
|
|
|
|
|
|
|
def ensure_connected(self, conn):
|
|
def ensure_connected(self, conn):
|
|
# Callback called for each retry while the connection
|
|
# Callback called for each retry while the connection
|