|
@@ -349,6 +349,7 @@ class Consumer(object):
|
|
|
self.init_callback(self)
|
|
|
|
|
|
while self._state != CLOSE:
|
|
|
+ self.maybe_shutdown()
|
|
|
try:
|
|
|
self.reset_connection()
|
|
|
self.consume_messages()
|
|
@@ -687,7 +688,8 @@ class Consumer(object):
|
|
|
return conn
|
|
|
|
|
|
return conn.ensure_connection(_error_handler,
|
|
|
- self.app.conf.BROKER_CONNECTION_MAX_RETRIES)
|
|
|
+ self.app.conf.BROKER_CONNECTION_MAX_RETRIES,
|
|
|
+ callback=self.maybe_shutdown)
|
|
|
|
|
|
def stop(self):
|
|
|
"""Stop consuming.
|
|
@@ -705,6 +707,12 @@ class Consumer(object):
|
|
|
def close(self):
|
|
|
self._state = CLOSE
|
|
|
|
|
|
+ def maybe_shutdown(self):
|
|
|
+ if state.should_stop:
|
|
|
+ raise SystemExit()
|
|
|
+ elif state.should_terminate:
|
|
|
+ raise SystemTerminate()
|
|
|
+
|
|
|
@property
|
|
|
def info(self):
|
|
|
"""Returns information about this consumer instance
|
|
@@ -728,10 +736,7 @@ class BlockingConsumer(Consumer):
|
|
|
debug("Ready to accept tasks!")
|
|
|
|
|
|
while self._state != CLOSE and self.connection:
|
|
|
- if state.should_stop:
|
|
|
- raise SystemExit()
|
|
|
- elif state.should_terminate:
|
|
|
- raise SystemTerminate()
|
|
|
+ self.maybe_shutdown()
|
|
|
if self.qos.prev != self.qos.value: # pragma: no cover
|
|
|
self.qos.update()
|
|
|
try:
|