|
@@ -5,6 +5,7 @@ import socket
|
|
|
|
|
|
from mock import Mock
|
|
|
|
|
|
+from celery import Celery
|
|
|
from celery import events
|
|
|
from celery.tests.utils import AppCase
|
|
|
|
|
@@ -40,10 +41,29 @@ class test_Event(AppCase):
|
|
|
|
|
|
class test_EventDispatcher(AppCase):
|
|
|
|
|
|
+ def test_redis_uses_fanout_exchange(self):
|
|
|
+ with Celery(set_as_current=False) as app:
|
|
|
+ app.connection = Mock()
|
|
|
+ conn = app.connection.return_value = Mock()
|
|
|
+ conn.transport.driver_type = 'redis'
|
|
|
+
|
|
|
+ dispatcher = app.events.Dispatcher(conn, enabled=False)
|
|
|
+ self.assertEqual(dispatcher.exchange.type, 'fanout')
|
|
|
+
|
|
|
+ def test_others_use_topic_exchange(self):
|
|
|
+ with Celery(set_as_current=False) as app:
|
|
|
+ app.connection = Mock()
|
|
|
+ conn = app.connection.return_value = Mock()
|
|
|
+ conn.transport.driver_type = 'amqp'
|
|
|
+ dispatcher = app.events.Dispatcher(conn, enabled=False)
|
|
|
+ self.assertEqual(dispatcher.exchange.type, 'topic')
|
|
|
+
|
|
|
def test_send(self):
|
|
|
producer = MockProducer()
|
|
|
producer.connection = self.app.connection()
|
|
|
- eventer = self.app.events.Dispatcher(object(), enabled=False,
|
|
|
+ connection = Mock()
|
|
|
+ connection.transport.driver_type = 'amqp'
|
|
|
+ eventer = self.app.events.Dispatcher(connection, enabled=False,
|
|
|
buffer_while_offline=False)
|
|
|
eventer.producer = producer
|
|
|
eventer.enabled = True
|