Browse Source

Adds unittests for celery.events (celery now has 88% coverage in total)

Ask Solem 15 years ago
parent
commit
837710589b
2 changed files with 76 additions and 2 deletions
  1. 3 2
      celery/events.py
  2. 73 0
      celery/tests/test_events.py

+ 3 - 2
celery/events.py

@@ -29,9 +29,10 @@ class EventDispatcher(object):
 
     """
 
-    def __init__(self, connection, hostname=None, enabled=True):
+    def __init__(self, connection, hostname=None, enabled=True,
+            publisher=None):
         self.connection = connection
-        self.publisher = EventPublisher(self.connection)
+        self.publisher = publisher or EventPublisher(self.connection)
         self.hostname = hostname or socket.gethostname()
         self.enabled = enabled
         self._lock = threading.Lock()

+ 73 - 0
celery/tests/test_events.py

@@ -0,0 +1,73 @@
+import unittest
+
+from celery import events
+
+
+class MockPublisher(object):
+
+    def __init__(self, *args, **kwargs):
+        self.sent = []
+
+    def send(self, msg, *args, **kwargs):
+        self.sent.append(msg)
+
+    def close(self):
+        pass
+
+    def has_event(self, kind):
+        for event in self.sent:
+            if event["type"] == kind:
+                return event
+        return False
+
+class TestEvent(unittest.TestCase):
+
+    def test_constructor(self):
+        event = events.Event("world war II")
+        self.assertEquals(event["type"], "world war II")
+        self.assertTrue(event["timestamp"])
+
+
+class TestEventDispatcher(unittest.TestCase):
+
+    def test_send(self):
+        publisher = MockPublisher()
+        eventer = events.EventDispatcher(object(), publisher=publisher)
+
+        eventer.publisher = publisher
+        eventer.enabled = True
+        eventer.send("World War II", ended=True)
+        self.assertTrue(publisher.has_event("World War II"))
+
+class TestEventReceiver(unittest.TestCase):
+
+    def test_process(self):
+
+        message = {"type": "world-war"}
+
+        got_event = [False]
+
+        def my_handler(event):
+            got_event[0] = True
+
+        r = events.EventReceiver(object(), handlers={
+                                    "world-war": my_handler})
+        r._receive(message, object())
+        self.assertTrue(got_event[0])
+
+    def test_catch_all_event(self):
+
+        message = {"type": "world-war"}
+
+        got_event = [False]
+
+        def my_handler(event):
+            got_event[0] = True
+
+        r = events.EventReceiver(object())
+        events.EventReceiver.handlers["*"] = my_handler
+        try:
+            r._receive(message, object())
+            self.assertTrue(got_event[0])
+        finally:
+            events.EventReceiver.handlers = {}