|
@@ -362,7 +362,7 @@ class Consumer(object):
|
|
|
next_step.format(when=humanize_seconds(interval, 'in', ' ')))
|
|
|
|
|
|
# remember that the connection is lazy, it won't establish
|
|
|
- # until it's needed.
|
|
|
+ # until needed.
|
|
|
if not self.app.conf.BROKER_CONNECTION_RETRY:
|
|
|
# retry disabled, just call connect directly.
|
|
|
conn.connect()
|
|
@@ -496,7 +496,7 @@ class Events(bootsteps.StartStopStep):
|
|
|
# Flush events sent while connection was down.
|
|
|
prev = c.event_dispatcher
|
|
|
dis = c.event_dispatcher = c.app.events.Dispatcher(
|
|
|
- c.connection, hostname=c.hostname,
|
|
|
+ c.connect(), hostname=c.hostname,
|
|
|
enabled=self.send_events, groups=self.groups,
|
|
|
)
|
|
|
if prev:
|
|
@@ -505,6 +505,7 @@ class Events(bootsteps.StartStopStep):
|
|
|
|
|
|
def stop(self, c):
|
|
|
if c.event_dispatcher:
|
|
|
+ ignore_errors(c, c.event_dispatcher.connection.close)
|
|
|
ignore_errors(c, c.event_dispatcher.close)
|
|
|
c.event_dispatcher = None
|
|
|
shutdown = stop
|