|
@@ -368,10 +368,13 @@ class Consumer(object):
|
|
|
conn.connect()
|
|
|
return conn
|
|
|
|
|
|
- return conn.ensure_connection(
|
|
|
+ conn = conn.ensure_connection(
|
|
|
_error_handler, self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
|
|
|
callback=maybe_shutdown,
|
|
|
)
|
|
|
+ if self.hub:
|
|
|
+ conn.transport.register_with_event_loop(conn.connection, self.hub)
|
|
|
+ return conn
|
|
|
|
|
|
def add_task_queue(self, queue, exchange=None, exchange_type=None,
|
|
|
routing_key=None, **options):
|