|
@@ -95,8 +95,15 @@ class TaskDaemon(object):
|
|
|
self.queue_wakeup_after
|
|
|
self.logger = setup_logger(loglevel, logfile)
|
|
|
self.pool = multiprocessing.Pool(self.concurrency)
|
|
|
+ self.reset_connection()
|
|
|
+
|
|
|
+ def reset_connection(self):
|
|
|
self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection())
|
|
|
|
|
|
+ def connection_diagnostics(self):
|
|
|
+ if not self.task_consumer.channel.connection:
|
|
|
+ self.reset_connection()
|
|
|
+
|
|
|
def fetch_next_task(self):
|
|
|
message = self.task_consumer.fetch()
|
|
|
if message is None: # No messages waiting.
|
|
@@ -136,13 +143,16 @@ class TaskDaemon(object):
|
|
|
"""The worker server's main loop."""
|
|
|
results = ProcessQueue(self.concurrency, logger=self.logger,
|
|
|
done_msg="Task %(name)s[%(id)s] processed: %(return_value)s")
|
|
|
- ev_msg_waiting = EventTimer(lambda: self.logger.info(
|
|
|
- "Waiting for queue..."),
|
|
|
- self.empty_msg_emit_every)
|
|
|
- ev_run_periodic_tasks = EventTimer(self.run_periodic_tasks, 1)
|
|
|
+ log_wait = lambda: self.logger.info("Waiting for queue...")
|
|
|
+ ev_msg_waiting = EventTimer(log_wait, self.empty_msg_emit_every)
|
|
|
+ events = [
|
|
|
+ EventTimer(self.run_periodic_tasks, 1),
|
|
|
+ EventTimer(self.connection_diagnostics, 3),
|
|
|
+ EventTimer(self.reset_connection, 60 * 5),
|
|
|
+ ]
|
|
|
|
|
|
while True:
|
|
|
- ev_run_periodic_tasks.tick()
|
|
|
+ [event.tick() for event in events]
|
|
|
try:
|
|
|
result, task_name, task_id = self.execute_next_task()
|
|
|
except ValueError:
|