Browse Source

100% Coverage for celery.worker.control.*

Ask Solem 14 years ago
parent
commit
7fa7d78a2a

+ 1 - 1
celery/tests/test_events.py

@@ -138,7 +138,7 @@ class TestEventReceiver(unittest.TestCase):
         connection = self.app.broker_connection()
         try:
             r = self.app.events.Receiver(connection, node_id="celery.tests")
-            it = r.itercapture(timeout=0.0001)
+            it = r.itercapture(timeout=0.0001, wakeup=False)
             consumer = it.next()
             self.assertTrue(consumer.queues)
             self.assertEqual(consumer.callbacks[0], r._receive)

+ 105 - 5
celery/tests/test_worker_control.py

@@ -54,6 +54,13 @@ class Consumer(object):
         self.app = app_or_default()
         self.event_dispatcher = Dispatcher()
 
+        from celery.concurrency.base import BasePool
+        self.pool = BasePool(10)
+
+    @property
+    def info(self):
+        return {"xyz": "XYZ"}
+
 
 class test_ControlPanel(unittest.TestCase):
 
@@ -63,6 +70,7 @@ class test_ControlPanel(unittest.TestCase):
 
     def create_state(self, **kwargs):
         kwargs.setdefault("logger", self.app.log.get_default_logger())
+        kwargs.setdefault("app", self.app)
         return AttributeDict(kwargs)
 
     def create_panel(self, **kwargs):
@@ -70,6 +78,15 @@ class test_ControlPanel(unittest.TestCase):
                                              state=self.create_state(**kwargs),
                                              handlers=Panel.data)
 
+    def test_enable_events(self):
+        consumer = Consumer()
+        panel = self.create_panel(consumer=consumer)
+        consumer.event_dispatcher.enabled = False
+        panel.handle("enable_events")
+        self.assertEqual(consumer.event_dispatcher.enabled, True)
+        self.assertIn("worker-online", consumer.event_dispatcher.sent)
+        self.assertTrue(panel.handle("enable_events")["ok"])
+
     def test_disable_events(self):
         consumer = Consumer()
         panel = self.create_panel(consumer=consumer)
@@ -77,20 +94,103 @@ class test_ControlPanel(unittest.TestCase):
         panel.handle("disable_events")
         self.assertEqual(consumer.event_dispatcher.enabled, False)
         self.assertIn("worker-offline", consumer.event_dispatcher.sent)
+        self.assertTrue(panel.handle("disable_events")["ok"])
 
-    def test_enable_events(self):
+    def test_heartbeat(self):
         consumer = Consumer()
         panel = self.create_panel(consumer=consumer)
-        consumer.event_dispatcher.enabled = False
-        panel.handle("enable_events")
-        self.assertEqual(consumer.event_dispatcher.enabled, True)
-        self.assertIn("worker-online", consumer.event_dispatcher.sent)
+        consumer.event_dispatcher.enabled = True
+        panel.handle("heartbeat")
+        self.assertIn("worker-heartbeat", consumer.event_dispatcher.sent)
 
     def test_dump_tasks(self):
         info = "\n".join(self.panel.handle("dump_tasks"))
         self.assertIn("mytask", info)
         self.assertIn("rate_limit=200", info)
 
+    def test_stats(self):
+        from celery.worker import state
+        prev_count, state.total_count = state.total_count, 100
+        try:
+            self.assertDictContainsSubset({"total": 100,
+                                           "consumer": {"xyz": "XYZ"}},
+                                          self.panel.handle("stats"))
+        finally:
+            state.total_count = prev_count
+
+    def test_active(self):
+        from celery.worker import state
+        from celery.worker.job import TaskRequest
+        from celery.task import PingTask
+
+        r = TaskRequest(PingTask.name, "do re mi", (), {})
+        state.active_requests.add(r)
+        try:
+            self.assertTrue(self.panel.handle("dump_active"))
+        finally:
+            state.active_requests.discard(r)
+
+    def test_pool_grow(self):
+
+        class MockPool(object):
+
+            def __init__(self, size=1):
+                self.size = size
+
+            def grow(self, n=1):
+                self.size += n
+
+            def shrink(self, n=1):
+                self.size -= n
+
+        consumer = Consumer()
+        consumer.pool = MockPool()
+        panel = self.create_panel(consumer=consumer)
+
+        panel.handle("pool_grow")
+        self.assertEqual(consumer.pool.size, 2)
+        panel.handle("pool_shrink")
+        self.assertEqual(consumer.pool.size, 1)
+
+    def test_add__cancel_consumer(self):
+
+        class MockConsumer(object):
+            queues = []
+            cancelled = []
+            consuming = False
+
+            def add_consumer_from_dict(self, **declaration):
+                self.queues.append(declaration["queue"])
+
+            def consume(self):
+                self.consuming = True
+
+            def cancel_by_queue(self, queue):
+                self.cancelled.append(queue)
+
+        consumer = Consumer()
+        consumer.task_consumer = MockConsumer()
+        panel = self.create_panel(consumer=consumer)
+
+        panel.handle("add_consumer", {"queue": "MyQueue"})
+        self.assertIn("MyQueue", consumer.task_consumer.queues)
+        self.assertTrue(consumer.task_consumer.consuming)
+        panel.handle("cancel_consumer", {"queue": "MyQueue"})
+        self.assertIn("MyQueue", consumer.task_consumer.cancelled)
+
+
+    def test_revoked(self):
+        from celery.worker import state
+        state.revoked.clear()
+        state.revoked.add("a1")
+        state.revoked.add("a2")
+
+        try:
+            self.assertListEqual(self.panel.handle("dump_revoked"),
+                                 ["a1", "a2"])
+        finally:
+            state.revoked.clear()
+
     def test_dump_schedule(self):
         consumer = Consumer()
         panel = self.create_panel(consumer=consumer)

+ 39 - 0
celery/tests/test_worker_heartbeat.py

@@ -4,12 +4,18 @@ from celery.worker.heartbeat import Heart
 
 
 class MockDispatcher(object):
+    shutdown_event = None
+    next_iter = False
 
     def __init__(self):
         self.sent = []
 
     def send(self, msg):
         self.sent.append(msg)
+        if self.shutdown_event:
+            if self.next_iter:
+                self.shutdown_event.set()
+            self.next_iter = True
 
 
 class MockDispatcherRaising(object):
@@ -19,8 +25,41 @@ class MockDispatcherRaising(object):
             raise Exception("foo")
 
 
+class MockHeart(Heart):
+    _alive = True
+    _joined = False
+
+    def isAlive(self):
+        return self._alive
+
+    def join(self, timeout=None):
+        self._joined = True
+
+
 class TestHeart(unittest.TestCase):
 
+    def test_stop(self):
+        eventer = MockDispatcher()
+        h = MockHeart(eventer, interval=1)
+        h._state = "RUN"
+        h.stop()
+        self.assertTrue(h._joined)
+
+        h2 = MockHeart(eventer, interval=1)
+        h2._alive = False
+        h2._state = "RUN"
+        h2.stop()
+        self.assertFalse(h2._joined)
+
+    def test_run_manages_cycle(self):
+        eventer = MockDispatcher()
+        heart = Heart(eventer, interval=1)
+        eventer.shutdown_event = heart._shutdown
+        heart.run()
+        self.assertEqual(heart._state, "RUN")
+        self.assertTrue(heart._shutdown.isSet())
+        self.assertTrue(heart._stopped.isSet())
+
     def test_run(self):
         eventer = MockDispatcher()
 

+ 2 - 2
celery/worker/control/builtins.py

@@ -167,13 +167,13 @@ def ping(panel, **kwargs):
 
 @Panel.register
 def pool_grow(panel, n=1, **kwargs):
-    panel.listener.pool.grow(n)
+    panel.consumer.pool.grow(n)
     return {"ok": "spawned worker processes"}
 
 
 @Panel.register
 def pool_shrink(panel, n=1, **kwargs):
-    panel.listener.pool.shrink(n)
+    panel.consumer.pool.shrink(n)
     return {"ok": "terminated worker processes"}