|
@@ -143,21 +143,21 @@ class Blueprint(object):
|
|
|
|
|
|
def send_all(self, parent, method,
|
|
|
description=None, reverse=True, propagate=True, args=()):
|
|
|
- description = description or method.capitalize()
|
|
|
+ description = description or method.replace('_', ' ')
|
|
|
steps = reversed(parent.steps) if reverse else parent.steps
|
|
|
for step in steps:
|
|
|
if step:
|
|
|
- self._debug('%s %s...',
|
|
|
- description.capitalize(), step.alias)
|
|
|
fun = getattr(step, method, None)
|
|
|
- if fun:
|
|
|
+ if fun is not None:
|
|
|
+ self._debug('%s %s...',
|
|
|
+ description.capitalize(), step.alias)
|
|
|
try:
|
|
|
fun(parent, *args)
|
|
|
except Exception as exc:
|
|
|
if propagate:
|
|
|
raise
|
|
|
logger.error(
|
|
|
- 'Error while %s %s: %r',
|
|
|
+ 'Error on %s %s: %r',
|
|
|
description, step.alias, exc, exc_info=1,
|
|
|
)
|
|
|
|
|
@@ -405,11 +405,17 @@ class ConsumerStep(StartStopStep):
|
|
|
consumer.consume()
|
|
|
|
|
|
def stop(self, c):
|
|
|
+ self._close(c, True)
|
|
|
+
|
|
|
+ def shutdown(self, c):
|
|
|
+ self._close(c, False)
|
|
|
+
|
|
|
+ def _close(self, c, cancel_consumers=True):
|
|
|
channels = set()
|
|
|
for consumer in self.consumers or []:
|
|
|
- ignore_errors(c.connection, consumer.cancel)
|
|
|
+ if cancel_consumers:
|
|
|
+ ignore_errors(c.connection, consumer.cancel)
|
|
|
if consumer.channel:
|
|
|
channels.add(consumer.channel)
|
|
|
for channel in channels:
|
|
|
ignore_errors(c.connection, channel.close)
|
|
|
- shutdown = stop
|