test_worker_control.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import socket
  2. import unittest2 as unittest
  3. from celery.task.builtins import PingTask
  4. from celery.utils import gen_unique_id
  5. from celery.worker import control
  6. from celery.worker.revoke import revoked
  7. from celery.registry import tasks
  8. hostname = socket.gethostname()
  9. class TestControlPanel(unittest.TestCase):
  10. def setUp(self):
  11. self.panel = self.create_panel(listener=object())
  12. def create_panel(self, **kwargs):
  13. return control.ControlDispatch(hostname=hostname, **kwargs)
  14. def test_shutdown(self):
  15. self.assertRaises(SystemExit, self.panel.execute, "shutdown")
  16. def test_dump_tasks(self):
  17. self.panel.execute("dump_tasks")
  18. def test_rate_limit(self):
  19. class Listener(object):
  20. class ReadyQueue(object):
  21. fresh = False
  22. def refresh(self):
  23. self.fresh = True
  24. def __init__(self):
  25. self.ready_queue = self.ReadyQueue()
  26. listener = Listener()
  27. panel = self.create_panel(listener=listener)
  28. task = tasks[PingTask.name]
  29. old_rate_limit = task.rate_limit
  30. try:
  31. panel.execute("rate_limit", kwargs=dict(task_name=task.name,
  32. rate_limit="100/m"))
  33. self.assertEqual(task.rate_limit, "100/m")
  34. self.assertTrue(listener.ready_queue.fresh)
  35. listener.ready_queue.fresh = False
  36. panel.execute("rate_limit", kwargs=dict(task_name=task.name,
  37. rate_limit=0))
  38. self.assertEqual(task.rate_limit, 0)
  39. self.assertTrue(listener.ready_queue.fresh)
  40. finally:
  41. task.rate_limit = old_rate_limit
  42. def test_rate_limit_nonexistant_task(self):
  43. self.panel.execute("rate_limit", kwargs={
  44. "task_name": "xxxx.does.not.exist",
  45. "rate_limit": "1000/s"})
  46. def test_unexposed_command(self):
  47. self.panel.execute("foo", kwargs={})
  48. def test_revoke(self):
  49. uuid = gen_unique_id()
  50. m = {"command": "revoke",
  51. "destination": hostname,
  52. "task_id": uuid}
  53. self.panel.dispatch_from_message(m)
  54. self.assertIn(uuid, revoked)
  55. m = {"command": "revoke",
  56. "destination": "does.not.exist",
  57. "task_id": uuid + "xxx"}
  58. self.panel.dispatch_from_message(m)
  59. self.assertNotIn(uuid + "xxx", revoked)