|
@@ -50,8 +50,12 @@ class AMQPListener(object):
|
|
def start(self):
|
|
def start(self):
|
|
"""Start processing AMQP messages."""
|
|
"""Start processing AMQP messages."""
|
|
task_consumer = self.reset_connection()
|
|
task_consumer = self.reset_connection()
|
|
|
|
+
|
|
|
|
+ self.logger.debug("AMQPListener: Starting message consumer...")
|
|
it = task_consumer.iterconsume(limit=None)
|
|
it = task_consumer.iterconsume(limit=None)
|
|
|
|
|
|
|
|
+ self.logger.debug("AMQPListener: Ready to accept tasks!")
|
|
|
|
+
|
|
while True:
|
|
while True:
|
|
it.next()
|
|
it.next()
|
|
|
|
|
|
@@ -90,6 +94,8 @@ class AMQPListener(object):
|
|
self.task_consumer.close()
|
|
self.task_consumer.close()
|
|
self.task_consumer = None
|
|
self.task_consumer = None
|
|
if self.amqp_connection:
|
|
if self.amqp_connection:
|
|
|
|
+ self.logger.debug(
|
|
|
|
+ "AMQPListener: Closing connection to the broker...")
|
|
self.amqp_connection.close()
|
|
self.amqp_connection.close()
|
|
self.amqp_connection = None
|
|
self.amqp_connection = None
|
|
|
|
|
|
@@ -100,6 +106,8 @@ class AMQPListener(object):
|
|
Resets the task consumer in :attr:`task_consumer`.
|
|
Resets the task consumer in :attr:`task_consumer`.
|
|
|
|
|
|
"""
|
|
"""
|
|
|
|
+ self.logger.debug(
|
|
|
|
+ "AMQPListener: Re-establishing connection to the broker...")
|
|
self.close_connection()
|
|
self.close_connection()
|
|
self.amqp_connection = DjangoAMQPConnection()
|
|
self.amqp_connection = DjangoAMQPConnection()
|
|
self.task_consumer = TaskConsumer(connection=self.amqp_connection)
|
|
self.task_consumer = TaskConsumer(connection=self.amqp_connection)
|
|
@@ -184,6 +192,8 @@ class WorkController(object):
|
|
self.bucket_queue = Queue()
|
|
self.bucket_queue = Queue()
|
|
self.hold_queue = Queue()
|
|
self.hold_queue = Queue()
|
|
|
|
|
|
|
|
+ self.logger.debug("Instantiating thread components...")
|
|
|
|
+
|
|
# Threads+Pool
|
|
# Threads+Pool
|
|
self.periodic_work_controller = PeriodicWorkController(
|
|
self.periodic_work_controller = PeriodicWorkController(
|
|
self.bucket_queue,
|
|
self.bucket_queue,
|
|
@@ -206,7 +216,10 @@ class WorkController(object):
|
|
self._state = "RUN"
|
|
self._state = "RUN"
|
|
|
|
|
|
try:
|
|
try:
|
|
- [component.start() for component in self.components]
|
|
|
|
|
|
+ for component in self.components:
|
|
|
|
+ self.logger.debug("Starting thread %s..." % \
|
|
|
|
+ component.__class__.__name__)
|
|
|
|
+ component.start()
|
|
finally:
|
|
finally:
|
|
self.stop()
|
|
self.stop()
|
|
|
|
|