Przeglądaj źródła

Restore behavior so Broadcast queues work. (#3934)

* Restore behavior so Broadcast queues work.

* Add unit test.

* Add additional unit tests.
Patrick Cloke 7 lat temu
rodzic
commit
767e6b9943
2 zmienionych plików z 36 dodań i 1 usunięć
  1. 1 1
      celery/app/amqp.py
  2. 35 0
      t/unit/app/test_amqp.py

+ 1 - 1
celery/app/amqp.py

@@ -521,7 +521,7 @@ class AMQP(object):
                     exchange_type = 'direct'
 
             # convert to anon-exchange, when exchange not set and direct ex.
-            if not exchange or not routing_key and exchange_type == 'direct':
+            if (not exchange or not routing_key) and exchange_type == 'direct':
                     exchange, routing_key = '', qname
             elif exchange is None:
                 # not topic exchange, and exchange not undefined

+ 35 - 0
t/unit/app/test_amqp.py

@@ -264,6 +264,41 @@ class test_AMQP:
         assert kwargs['routing_key'] == 'foo'
         assert kwargs['exchange'] == ''
 
+    def test_send_task_message__broadcast_without_exchange(self):
+        from kombu.common import Broadcast
+        evd = Mock(name='evd')
+        self.app.amqp.send_task_message(
+            Mock(), 'foo', self.simple_message, retry=False,
+            routing_key='xyz', queue=Broadcast('abc'),
+            event_dispatcher=evd,
+        )
+        evd.publish.assert_called()
+        event = evd.publish.call_args[0][1]
+        assert event['routing_key'] == 'xyz'
+        assert event['exchange'] == 'abc'
+
+    def test_send_event_exchange_direct_with_exchange(self):
+        prod = Mock(name='prod')
+        self.app.amqp.send_task_message(
+            prod, 'foo', self.simple_message_no_sent_event, queue='bar',
+            retry=False, exchange_type='direct', exchange='xyz',
+        )
+        prod.publish.assert_called()
+        pub = prod.publish.call_args[1]
+        assert pub['routing_key'] == 'bar'
+        assert pub['exchange'] == ''
+
+    def test_send_event_exchange_direct_with_routing_key(self):
+        prod = Mock(name='prod')
+        self.app.amqp.send_task_message(
+            prod, 'foo', self.simple_message_no_sent_event, queue='bar',
+            retry=False, exchange_type='direct', routing_key='xyb',
+        )
+        prod.publish.assert_called()
+        pub = prod.publish.call_args[1]
+        assert pub['routing_key'] == 'bar'
+        assert pub['exchange'] == ''
+
     def test_send_event_exchange_string(self):
         evd = Mock(name='evd')
         self.app.amqp.send_task_message(