|
@@ -5,7 +5,7 @@ import socket
|
|
from mock import Mock
|
|
from mock import Mock
|
|
|
|
|
|
from celery import Celery
|
|
from celery import Celery
|
|
-from celery import events
|
|
|
|
|
|
+from celery.events import Event
|
|
from celery.tests.case import AppCase
|
|
from celery.tests.case import AppCase
|
|
|
|
|
|
|
|
|
|
@@ -33,7 +33,7 @@ class MockProducer(object):
|
|
class test_Event(AppCase):
|
|
class test_Event(AppCase):
|
|
|
|
|
|
def test_constructor(self):
|
|
def test_constructor(self):
|
|
- event = events.Event('world war II')
|
|
|
|
|
|
+ event = Event('world war II')
|
|
self.assertEqual(event['type'], 'world war II')
|
|
self.assertEqual(event['type'], 'world war II')
|
|
self.assertTrue(event['timestamp'])
|
|
self.assertTrue(event['timestamp'])
|
|
|
|
|
|
@@ -57,6 +57,16 @@ class test_EventDispatcher(AppCase):
|
|
dispatcher = app.events.Dispatcher(conn, enabled=False)
|
|
dispatcher = app.events.Dispatcher(conn, enabled=False)
|
|
self.assertEqual(dispatcher.exchange.type, 'topic')
|
|
self.assertEqual(dispatcher.exchange.type, 'topic')
|
|
|
|
|
|
|
|
+ def test_takes_channel_connection(self):
|
|
|
|
+ x = self.app.events.Dispatcher(channel=Mock())
|
|
|
|
+ self.assertIs(x.connection, x.channel.connection.client)
|
|
|
|
+
|
|
|
|
+ def test_sql_transports_disabled(self):
|
|
|
|
+ conn = Mock()
|
|
|
|
+ conn.transport.driver_type = 'sql'
|
|
|
|
+ x = self.app.events.Dispatcher(connection=conn)
|
|
|
|
+ self.assertFalse(x.enabled)
|
|
|
|
+
|
|
def test_send(self):
|
|
def test_send(self):
|
|
producer = MockProducer()
|
|
producer = MockProducer()
|
|
producer.connection = self.app.connection()
|
|
producer.connection = self.app.connection()
|
|
@@ -163,9 +173,11 @@ class test_EventReceiver(AppCase):
|
|
|
|
|
|
connection = Mock()
|
|
connection = Mock()
|
|
connection.transport_cls = 'memory'
|
|
connection.transport_cls = 'memory'
|
|
- r = events.EventReceiver(connection,
|
|
|
|
- handlers={'world-war': my_handler},
|
|
|
|
- node_id='celery.tests')
|
|
|
|
|
|
+ r = self.app.events.Receiver(
|
|
|
|
+ connection,
|
|
|
|
+ handlers={'world-war': my_handler},
|
|
|
|
+ node_id='celery.tests',
|
|
|
|
+ )
|
|
r._receive(message, object())
|
|
r._receive(message, object())
|
|
self.assertTrue(got_event[0])
|
|
self.assertTrue(got_event[0])
|
|
|
|
|
|
@@ -180,7 +192,7 @@ class test_EventReceiver(AppCase):
|
|
|
|
|
|
connection = Mock()
|
|
connection = Mock()
|
|
connection.transport_cls = 'memory'
|
|
connection.transport_cls = 'memory'
|
|
- r = events.EventReceiver(connection, node_id='celery.tests')
|
|
|
|
|
|
+ r = self.app.events.Receiver(connection, node_id='celery.tests')
|
|
r.handlers['*'] = my_handler
|
|
r.handlers['*'] = my_handler
|
|
r._receive(message, object())
|
|
r._receive(message, object())
|
|
self.assertTrue(got_event[0])
|
|
self.assertTrue(got_event[0])
|
|
@@ -199,6 +211,20 @@ class test_EventReceiver(AppCase):
|
|
finally:
|
|
finally:
|
|
connection.close()
|
|
connection.close()
|
|
|
|
|
|
|
|
+ def test_event_from_message_localize_disabled(self):
|
|
|
|
+ r = self.app.events.Receiver(Mock(), node_id='celery.tests')
|
|
|
|
+ r.adjust_clock = Mock()
|
|
|
|
+ ts_adjust = Mock()
|
|
|
|
+
|
|
|
|
+ e = r.event_from_message(
|
|
|
|
+ {'type': 'worker-online', 'clock': 313},
|
|
|
|
+ localize=False,
|
|
|
|
+ adjust_timestamp=ts_adjust,
|
|
|
|
+ )
|
|
|
|
+ self.assertFalse(ts_adjust.called)
|
|
|
|
+ r.adjust_clock.assert_called_with(313)
|
|
|
|
+
|
|
|
|
+
|
|
def test_itercapture_limit(self):
|
|
def test_itercapture_limit(self):
|
|
connection = self.app.connection()
|
|
connection = self.app.connection()
|
|
channel = connection.channel()
|
|
channel = connection.channel()
|
|
@@ -208,12 +234,14 @@ class test_EventReceiver(AppCase):
|
|
def handler(event):
|
|
def handler(event):
|
|
events_received[0] += 1
|
|
events_received[0] += 1
|
|
|
|
|
|
- producer = self.app.events.Dispatcher(connection,
|
|
|
|
- enabled=True,
|
|
|
|
- channel=channel)
|
|
|
|
- r = self.app.events.Receiver(connection,
|
|
|
|
- handlers={'*': handler},
|
|
|
|
- node_id='celery.tests')
|
|
|
|
|
|
+ producer = self.app.events.Dispatcher(
|
|
|
|
+ connection, enabled=True, channel=channel,
|
|
|
|
+ )
|
|
|
|
+ r = self.app.events.Receiver(
|
|
|
|
+ connection,
|
|
|
|
+ handlers={'*': handler},
|
|
|
|
+ node_id='celery.tests',
|
|
|
|
+ )
|
|
evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
|
|
evs = ['ev1', 'ev2', 'ev3', 'ev4', 'ev5']
|
|
for ev in evs:
|
|
for ev in evs:
|
|
producer.send(ev)
|
|
producer.send(ev)
|