Browse Source

Always close AMQP connection.

Ask Solem 16 years ago
parent
commit
0e3fb9c4c0
1 changed files with 12 additions and 7 deletions
  1. 12 7
      celery/task.py

+ 12 - 7
celery/task.py

@@ -23,9 +23,10 @@ def delay_task(task_name, *args, **kwargs):
         raise tasks.NotRegistered(
                 "Task with name %s not registered in the task registry." % (
                     task_name))
-    publisher = TaskPublisher(connection=DjangoAMQPConnection())
+    amqp_connection = DjangoAMQPConnection()
+    publisher = TaskPublisher(connection=amqp_connection)
     task_id = publisher.delay_task(task_name, *args, **kwargs)
-    publisher.close()
+    amqp_connection.close()
     return AsyncResult(task_id)
 
 
@@ -38,9 +39,10 @@ def discard_all():
     Returns the number of tasks discarded.
 
     """
-    consumer = TaskConsumer(connection=DjangoAMQPConnection())
+    amqp_connection = DjangoAMQPConnection()
+    consumer = TaskConsumer(connection=amqp_connection)
     discarded_count = consumer.discard_all()
-    consumer.close()
+    amqp_connection.close()
     return discarded_count
 
 
@@ -129,7 +131,9 @@ class Task(object):
         return TaskConsumer(connection=DjangoAMQPConnection())
 
     def requeue(self, task_id, args, kwargs):
-        self.get_publisher().requeue_task(self.name, task_id, args, kwargs)
+        publisher = self.get_publisher()
+        publisher.requeue_task(self.name, task_id, args, kwargs)
+        publisher.connection.close()
 
     def retry(self, task_id, args, kwargs):
         retry_queue.put(self.name, task_id, args, kwargs)
@@ -189,7 +193,8 @@ class TaskSet(object):
             True
         """
         taskset_id = str(uuid.uuid4())
-        publisher = TaskPublisher(connection=DjangoAMQPConnection())
+        amqp_connection = DjangoAMQPConnection()
+        publisher = TaskPublisher(connection=amqp_connection)
         subtask_ids = []
         for arg, kwarg in self.arguments:
             subtask_id = publisher.delay_task_in_set(task_name=self.task_name,
@@ -197,7 +202,7 @@ class TaskSet(object):
                                                      task_args=arg,
                                                      task_kwargs=kwarg)
             subtask_ids.append(subtask_id) 
-        publisher.close()
+        amqp_connection.close()
         return taskset_id, subtask_ids
 
     def iterate(self):