test_worker_control.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. import socket
  2. import unittest
  3. from celery.task.builtins import PingTask
  4. from celery.utils import gen_unique_id
  5. from celery.worker import control
  6. from celery.worker.revoke import revoked
  7. from celery.registry import tasks
  8. hostname = socket.gethostname()
  9. class TestControlPanel(unittest.TestCase):
  10. def setUp(self):
  11. self.panel = control.ControlDispatch(hostname=hostname)
  12. def test_shutdown(self):
  13. self.assertRaises(SystemExit, self.panel.execute, "shutdown")
  14. def test_dump_tasks(self):
  15. self.panel.execute("dump_tasks")
  16. def test_rate_limit(self):
  17. task = tasks[PingTask.name]
  18. old_rate_limit = task.rate_limit
  19. try:
  20. self.panel.execute("rate_limit", kwargs=dict(
  21. task_name=task.name,
  22. rate_limit="100/m"))
  23. self.assertEquals(task.rate_limit, "100/m")
  24. self.panel.execute("rate_limit", kwargs=dict(
  25. task_name=task.name,
  26. rate_limit=0))
  27. self.assertEquals(task.rate_limit, 0)
  28. finally:
  29. task.rate_limit = old_rate_limit
  30. def test_rate_limit_nonexistant_task(self):
  31. self.panel.execute("rate_limit", kwargs={
  32. "task_name": "xxxx.does.not.exist",
  33. "rate_limit": "1000/s"})
  34. def test_unexposed_command(self):
  35. self.panel.execute("foo", kwargs={})
  36. def test_revoke(self):
  37. uuid = gen_unique_id()
  38. m = {"command": "revoke",
  39. "destination": hostname,
  40. "task_id": uuid}
  41. self.panel.dispatch_from_message(m)
  42. self.assertTrue(uuid in revoked)
  43. m = {"command": "revoke",
  44. "destination": "does.not.exist",
  45. "task_id": uuid + "xxx"}
  46. self.panel.dispatch_from_message(m)
  47. self.assertTrue(uuid + "xxx" not in revoked)