Browse Source

Connection leak problem fixed.

Ask Solem 16 years ago
parent
commit
b935d461b7
1 changed files with 24 additions and 7 deletions
  1. 24 7
      celery/worker.py

+ 24 - 7
celery/worker.py

@@ -108,19 +108,39 @@ class TaskDaemon(object):
                                     self.queue_wakeup_after
         self.logger = setup_logger(loglevel, logfile)
         self.pool = multiprocessing.Pool(self.concurrency)
+        self.task_consumer = None
         self.reset_connection()
 
     def reset_connection(self):
-        if hasattr(self, "task_consumer"):
-            self.task_consumer.close()
-        self.task_consumer = TaskConsumer(connection=DjangoAMQPConnection())
+        if self.task_consumer:
+            self.task_consumer.connection.close()
+        amqp_connection = DjangoAMQPConnection()
+        self.task_consumer = TaskConsumer(connection=amqp_connection)
 
     def connection_diagnostics(self):
         if not self.task_consumer.channel.connection:
+            self.logger.info(
+                    "AMQP Connection has died, restoring connection.")
             self.reset_connection()
 
-    def fetch_next_task(self):
+    def receive_message(self):
+        self.connection_diagnostics()
         message = self.task_consumer.fetch()
+        if message is not None:
+            message.ack()
+        return message
+
+    def receive_message_cc(self):
+        amqp_connection = DjangoAMQPConnection()
+        task_consumer = TaskConsumer(connection=amqp_connection)
+        message = task_consumer.fetch()
+        if message is not None:
+            message.ack()
+        amqp_connection.close()
+        return message
+
+    def fetch_next_task(self):
+        message = self.receive_message()
         if message is None: # No messages waiting.
             raise EmptyQueue()
 
@@ -141,7 +161,6 @@ class TaskDaemon(object):
                 error.__class__, error, traceback.format_exc()))
             return 
 
-        message.ack()
         return result, task.task_name, task.task_id
 
     def run_periodic_tasks(self):
@@ -167,8 +186,6 @@ class TaskDaemon(object):
         events = [
             EventTimer(self.run_periodic_tasks, 1),
             EventTimer(self.schedule_retry_tasks, 2),
-            EventTimer(self.connection_diagnostics, 3),
-            EventTimer(self.reset_connection, 60 * 5),
         ]
 
         while True: