Browse Source

Optimization: Only declare a task exchange once for every process.

Ask Solem 15 years ago
parent
commit
635f711179
1 changed files with 5 additions and 0 deletions
  1. 5 0
      celery/messaging.py

+ 5 - 0
celery/messaging.py

@@ -24,6 +24,7 @@ extract_msg_options = lambda d: dict(zip(MSG_OPTIONS, get_msg_options(d)))
 default_queue = conf.routing_table[conf.DEFAULT_QUEUE]
 default_queue = conf.routing_table[conf.DEFAULT_QUEUE]
 
 
 _queues_declared = False
 _queues_declared = False
+_exchanges_declared = {}
 
 
 
 
 class TaskPublisher(Publisher):
 class TaskPublisher(Publisher):
@@ -32,6 +33,7 @@ class TaskPublisher(Publisher):
     exchange_type = default_queue["exchange_type"]
     exchange_type = default_queue["exchange_type"]
     routing_key = conf.DEFAULT_ROUTING_KEY
     routing_key = conf.DEFAULT_ROUTING_KEY
     serializer = conf.TASK_SERIALIZER
     serializer = conf.TASK_SERIALIZER
+    auto_declare = False
 
 
     def __init__(self, *args, **kwargs):
     def __init__(self, *args, **kwargs):
         super(TaskPublisher, self).__init__(*args, **kwargs)
         super(TaskPublisher, self).__init__(*args, **kwargs)
@@ -42,6 +44,9 @@ class TaskPublisher(Publisher):
             consumers = get_consumer_set(self.connection)
             consumers = get_consumer_set(self.connection)
             consumers.close()
             consumers.close()
             _queues_declared = True
             _queues_declared = True
+        if self.exchange not in _exchanges_declared:
+            self.declare()
+            _exchanges_declared[self.exchange] = True
 
 
     def delay_task(self, task_name, task_args=None, task_kwargs=None,
     def delay_task(self, task_name, task_args=None, task_kwargs=None,
             countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
             countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):