test_supervisor.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import unittest
  2. from celery.supervisor import OFASupervisor
  3. from celery.supervisor import TimeoutError, MaxRestartsExceededError
  4. def target_one(x, y, z):
  5. return x * y * z
  6. class MockProcess(object):
  7. _started = False
  8. _stopped = False
  9. _terminated = False
  10. _joined = False
  11. alive = True
  12. timeout_on_is_alive = False
  13. def __init__(self, target, args, kwargs):
  14. self.target = target
  15. self.args = args
  16. self.kwargs = kwargs
  17. def start(self):
  18. self._stopped = False
  19. self._started = True
  20. def stop(self):
  21. self._stopped = True
  22. self._started = False
  23. def terminate(self):
  24. self._terminated = False
  25. def is_alive(self):
  26. if self._started and self.alive:
  27. if self.timeout_on_is_alive:
  28. raise TimeoutError("Supervised: timed out.")
  29. return True
  30. return False
  31. def join(self, timeout=None):
  32. self._joined = True
  33. class TestOFASupervisor(unittest.TestCase):
  34. def test_init(self):
  35. s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={})
  36. s.Process = MockProcess
  37. def test_start(self):
  38. MockProcess.alive = False
  39. s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={},
  40. max_restart_freq=0, max_restart_freq_time=0)
  41. s.Process = MockProcess
  42. self.assertRaises(MaxRestartsExceededError, s.start)
  43. MockProcess.alive = True
  44. def test_start_is_alive_timeout(self):
  45. MockProcess.alive = True
  46. MockProcess.timeout_on_is_alive = True
  47. s = OFASupervisor(target=target_one, args=[2, 4, 8], kwargs={},
  48. max_restart_freq=0, max_restart_freq_time=0)
  49. s.Process = MockProcess
  50. self.assertRaises(MaxRestartsExceededError, s.start)
  51. MockProcess.timeout_on_is_alive = False