test_worker_heartbeat.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. import unittest
  2. from celery.worker.heartbeat import Heart
  3. class MockDispatcher(object):
  4. def __init__(self):
  5. self.sent = []
  6. def send(self, msg):
  7. self.sent.append(msg)
  8. class MockDispatcherRaising(object):
  9. def send(self, msg):
  10. if msg == "worker-offline":
  11. raise Exception("foo")
  12. class TestHeart(unittest.TestCase):
  13. def test_run(self):
  14. eventer = MockDispatcher()
  15. heart = Heart(eventer, interval=1)
  16. heart._shutdown.set()
  17. heart.run()
  18. self.assertTrue(heart._state == "RUN")
  19. self.assertTrue("worker-online" in eventer.sent)
  20. self.assertTrue("worker-heartbeat" in eventer.sent)
  21. self.assertTrue("worker-offline" in eventer.sent)
  22. self.assertTrue(heart._stopped.isSet())
  23. heart.stop()
  24. heart.stop()
  25. self.assertTrue(heart._state == "CLOSE")
  26. heart = Heart(eventer, interval=0.00001)
  27. heart._shutdown.set()
  28. for i in range(10):
  29. heart.run()
  30. def test_run_stopped_is_set_even_if_send_breaks(self):
  31. eventer = MockDispatcherRaising()
  32. heart = Heart(eventer, interval=1)
  33. heart._shutdown.set()
  34. self.assertRaises(Exception, heart.run)
  35. self.assertTrue(heart._stopped.isSet())