Browse Source

Declare undeclared exchanges passed to TaskPublisher.delay_task. Requires carrot 0.10.6

Ask Solem 14 years ago
parent
commit
acfb985db0
3 changed files with 10 additions and 7 deletions
  1. 8 5
      celery/app/amqp.py
  2. 1 1
      contrib/requirements/default.txt
  3. 1 1
      setup.py

+ 8 - 5
celery/app/amqp.py

@@ -64,11 +64,14 @@ class TaskPublisher(messaging.Publisher):
             message_data["taskset"] = taskset_id
 
         # FIXME (carrot Publisher.send needs to accept exchange argument)
-        if exchange:
-            self.exchange = exchange
-        if exchange_type:
-            self.exchange_type = exchange_type
-        self.send(message_data, **extract_msg_options(kwargs))
+        if exchange and exchange not in _exchanges_declared:
+            exchange_type = exchange_type or self.exchange_type
+            self.backend.exchange_declare(exchange=exchange,
+                                          exchange_type=exchange_type,
+                                          durable=self.durable,
+                                          auto_delete=self.auto_delete)
+        self.send(message_data, exchange=exchange,
+                  **extract_msg_options(kwargs))
         signals.task_sent.send(sender=task_name, **message_data)
 
         return task_id

+ 1 - 1
contrib/requirements/default.txt

@@ -1,5 +1,5 @@
 python-dateutil
 sqlalchemy
 anyjson
-carrot>=0.10.5
+carrot>=0.10.6
 pyparsing

+ 1 - 1
setup.py

@@ -37,7 +37,7 @@ except ImportError:
 install_requires.extend([
     "python-dateutil",
     "anyjson",
-    "carrot>=0.10.5",
+    "carrot>=0.10.6",
     "pyparsing",
 ])