|
@@ -45,7 +45,11 @@ class Pidbox(object):
|
|
|
self.node.channel = c.connection.channel()
|
|
|
self.consumer = self.node.listen(callback=self.on_message)
|
|
|
|
|
|
+ def on_stop(self):
|
|
|
+ pass
|
|
|
+
|
|
|
def stop(self, c):
|
|
|
+ self.on_stop()
|
|
|
self.consumer = self._close_channel(c)
|
|
|
|
|
|
def reset(self):
|
|
@@ -58,6 +62,7 @@ class Pidbox(object):
|
|
|
ignore_errors(c, self.node.channel.close)
|
|
|
|
|
|
def shutdown(self, c):
|
|
|
+ self.on_stop()
|
|
|
if self.consumer:
|
|
|
debug('Cancelling broadcast consumer...')
|
|
|
ignore_errors(c, self.consumer.cancel)
|
|
@@ -72,13 +77,12 @@ class gPidbox(Pidbox):
|
|
|
def start(self, c):
|
|
|
c.pool.spawn_n(self.loop, c)
|
|
|
|
|
|
- def stop(self, c):
|
|
|
+ def on_stop(self):
|
|
|
if self._node_stopped:
|
|
|
self._node_shutdown.set()
|
|
|
debug('Waiting for broadcast thread to shutdown...')
|
|
|
self._node_stopped.wait()
|
|
|
self._node_stopped = self._node_shutdown = None
|
|
|
- super(gPidbox, self).stop(c)
|
|
|
|
|
|
def reset(self):
|
|
|
self._resets += 1
|