Browse Source

Only use anon exchange for direct exchanges

Ask Solem 9 years ago
parent
commit
6946fb74b6
1 changed files with 10 additions and 2 deletions
  1. 10 2
      celery/app/amqp.py

+ 10 - 2
celery/app/amqp.py

@@ -476,7 +476,7 @@ class AMQP(object):
                               retry=None, retry_policy=None,
                               serializer=None, delivery_mode=None,
                               compression=None, declare=None,
-                              headers=None, **kwargs):
+                              headers=None, exchange_type=None, **kwargs):
             retry = default_retry if retry is None else retry
             headers2, properties, body, sent_event = message
             if headers:
@@ -492,13 +492,21 @@ class AMQP(object):
                     qname, queue = queue, queues[queue]
                 else:
                     qname = queue.name
+
             if delivery_mode is None:
                 try:
                     delivery_mode = queue.exchange.delivery_mode
                 except AttributeError:
                     pass
                 delivery_mode = delivery_mode or default_delivery_mode
-            if not exchange and not routing_key:
+
+            if exchange_type is None:
+                try:
+                    exchange_type = queue.exchange.type
+                except AttributeError:
+                    exchange_type = 'direct'
+
+            if not exchange and not routing_key and exchange_type == 'direct':
                 exchange, routing_key = '', qname
             else:
                 exchange = exchange or queue.exchange.name or default_exchange