Browse Source

97% coverage for celery.worker.control.builtins

Ask Solem 15 years ago
parent
commit
4fb9f12866
2 changed files with 115 additions and 11 deletions
  1. 115 4
      celery/tests/test_worker_control.py
  2. 0 7
      celery/worker/control/registry.py

+ 115 - 4
celery/tests/test_worker_control.py

@@ -1,25 +1,107 @@
 import socket
 import unittest2 as unittest
 
+from celery import conf
+from celery.decorators import task
+from celery.registry import tasks
 from celery.task.builtins import PingTask
 from celery.utils import gen_unique_id
 from celery.worker import control
+from celery.worker.buckets import FastQueue
 from celery.worker.revoke import revoked
-from celery.registry import tasks
+from celery.worker.scheduler import Scheduler
 
 hostname = socket.gethostname()
 
 
-class TestControlPanel(unittest.TestCase):
+@task(rate_limit=200) # for extra info in dump_tasks
+def mytask():
+    pass
+
+
+class Dispatcher(object):
+
+    def __init__(self, *args, **kwargs):
+        self.sent = []
+
+    def enable(self):
+        self.enabled = True
+
+    def disable(self):
+        self.enabled = False
+
+    def send(self, event):
+        self.sent.append(event)
+
+
+class Listener(object):
+
+    def __init__(self):
+        self.ready_queue = FastQueue()
+        self.ready_queue.put("the quick brown fox")
+        self.eta_schedule = Scheduler(self.ready_queue)
+        self.event_dispatcher = Dispatcher()
+
+
+class test_ControlPanel(unittest.TestCase):
 
     def setUp(self):
-        self.panel = self.create_panel(listener=object())
+        self.panel = self.create_panel(listener=Listener())
 
     def create_panel(self, **kwargs):
         return control.ControlDispatch(hostname=hostname, **kwargs)
 
+    def test_disable_events(self):
+        listener = Listener()
+        panel = self.create_panel(listener=listener)
+        panel.execute("disable_events")
+        self.assertEqual(listener.event_dispatcher.enabled, False)
+        self.assertIn("worker-offline", listener.event_dispatcher.sent)
+
+    def test_enable_events(self):
+        listener = Listener()
+        panel = self.create_panel(listener=listener)
+        panel.execute("enable_events")
+        self.assertEqual(listener.event_dispatcher.enabled, True)
+        self.assertIn("worker-online", listener.event_dispatcher.sent)
+
     def test_dump_tasks(self):
-        self.panel.execute("dump_tasks")
+        tasks = "\n".join(self.panel.execute("dump_tasks"))
+        self.assertIn("mytask", tasks)
+        self.assertIn("rate_limit=200", tasks)
+
+    def test_dump_schedule(self):
+        listener = Listener()
+        panel = self.create_panel(listener=listener)
+        self.assertFalse(panel.execute("dump_schedule"))
+        listener.eta_schedule.enter("foo", eta=100)
+        self.assertTrue(panel.execute("dump_schedule"))
+
+    def test_dump_reserved(self):
+        listener = Listener()
+        panel = self.create_panel(listener=listener)
+        tasks = "\n".join(panel.execute("dump_reserved"))
+        self.assertIn("the quick brown fox", tasks)
+        listener.ready_queue = FastQueue()
+        tasks = "\n".join(panel.execute("dump_reserved"))
+        self.assertFalse(tasks)
+
+    def test_dump_reserved(self):
+        self.panel.execute("dump_reserved")
+
+    def test_rate_limit_when_disabled(self):
+        conf.DISABLE_RATE_LIMITS = True
+        try:
+            e = self.panel.execute("rate_limit", kwargs=dict(
+                 task_name=mytask.name, rate_limit="100/m"))
+            self.assertIn("rate limits disabled", e.get("error"))
+        finally:
+            conf.DISABLE_RATE_LIMITS = False
+
+    def test_rate_limit_invalid_rate_limit_string(self):
+        e = self.panel.execute("rate_limit", kwargs=dict(
+            task_name="tasks.add", rate_limit="x1240301#%!"))
+        self.assertIn("Invalid rate limit string", e.get("error"))
 
     def test_rate_limit(self):
 
@@ -60,6 +142,24 @@ class TestControlPanel(unittest.TestCase):
     def test_unexposed_command(self):
         self.panel.execute("foo", kwargs={})
 
+    def test_revoke_with_name(self):
+        uuid = gen_unique_id()
+        m = {"command": "revoke",
+             "destination": hostname,
+             "task_id": uuid,
+             "task_name": mytask.name}
+        self.panel.dispatch_from_message(m)
+        self.assertIn(uuid, revoked)
+
+    def test_revoke_with_name_not_in_registry(self):
+        uuid = gen_unique_id()
+        m = {"command": "revoke",
+             "destination": hostname,
+             "task_id": uuid,
+             "task_name": "xxxxxxxxx33333333388888"}
+        self.panel.dispatch_from_message(m)
+        self.assertIn(uuid, revoked)
+
     def test_revoke(self):
         uuid = gen_unique_id()
         m = {"command": "revoke",
@@ -73,3 +173,14 @@ class TestControlPanel(unittest.TestCase):
              "task_id": uuid + "xxx"}
         self.panel.dispatch_from_message(m)
         self.assertNotIn(uuid + "xxx", revoked)
+
+    def test_ping(self):
+        m = {"command": "ping",
+             "destination": hostname}
+        r = self.panel.dispatch_from_message(m)
+        self.assertEqual(r, "pong")
+
+    def test_shutdown(self):
+        m = {"command": "shutdown",
+             "destination": hostname}
+        self.assertRaises(SystemExit, self.panel.dispatch_from_message, m)

+ 0 - 7
celery/worker/control/registry.py

@@ -13,10 +13,3 @@ class Panel(UserDict):
     def register(cls, method, name=None):
         cls.data[name or method.__name__] = method
         return method
-
-    @classmethod
-    def unregister(cls, name_or_method):
-        name = name_or_method
-        if not isinstance(name_or_method, basestring):
-            name = name_or_method.__name__
-        cls.data.pop(name)