Browse Source

100% coverage for celery.worker.control

Ask Solem 15 years ago
parent
commit
1519112879
2 changed files with 30 additions and 3 deletions
  1. 26 0
      celery/tests/test_worker_control.py
  2. 4 3
      celery/worker/control/__init__.py

+ 26 - 0
celery/tests/test_worker_control.py

@@ -184,3 +184,29 @@ class test_ControlPanel(unittest.TestCase):
         m = {"command": "shutdown",
              "destination": hostname}
         self.assertRaises(SystemExit, self.panel.dispatch_from_message, m)
+
+
+    def test_panel_reply(self):
+
+        replies = []
+
+        class MockReplyPublisher(object):
+
+            def __init__(self, *args, **kwargs):
+                pass
+
+            def send(self, reply, **kwargs):
+                replies.append(reply)
+
+            def close(self):
+                pass
+
+        class _Dispatch(control.ControlDispatch):
+            ReplyPublisher = MockReplyPublisher
+
+        panel = _Dispatch(hostname, listener=Listener())
+
+        r = panel.execute("ping", reply_to={"exchange": "x",
+                                            "routing_key": "x"})
+        self.assertEqual(r, "pong")
+        self.assertDictEqual(replies[0], {panel.hostname: "pong"})

+ 4 - 3
celery/worker/control/__init__.py

@@ -7,18 +7,19 @@ from celery.worker.control import builtins
 
 class ControlDispatch(object):
     """Execute worker control panel commands."""
-    panel_cls = Panel
+    Panel = Panel
+    ReplyPublisher = ControlReplyPublisher
 
     def __init__(self, logger=None, hostname=None, listener=None):
         self.logger = logger or log.get_default_logger()
         self.hostname = hostname
         self.listener = listener
-        self.panel = self.panel_cls(self.logger, self.listener, self.hostname)
+        self.panel = self.Panel(self.logger, self.listener, self.hostname)
 
     @with_connection
     def reply(self, data, exchange, routing_key, connection=None,
             connect_timeout=None):
-        crq = ControlReplyPublisher(connection, exchange=exchange)
+        crq = self.ReplyPublisher(connection, exchange=exchange)
         try:
             crq.send(data, routing_key=routing_key)
         finally: