test_worker_heartbeat.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. from celery.tests.utils import unittest
  2. from celery.worker.heartbeat import Heart
  3. class MockDispatcher(object):
  4. shutdown_event = None
  5. next_iter = False
  6. def __init__(self):
  7. self.sent = []
  8. def send(self, msg):
  9. self.sent.append(msg)
  10. if self.shutdown_event:
  11. if self.next_iter:
  12. self.shutdown_event.set()
  13. self.next_iter = True
  14. class MockDispatcherRaising(object):
  15. def send(self, msg):
  16. if msg == "worker-offline":
  17. raise Exception("foo")
  18. class MockHeart(Heart):
  19. _alive = True
  20. _joined = False
  21. def isAlive(self):
  22. return self._alive
  23. def join(self, timeout=None):
  24. self._joined = True
  25. class TestHeart(unittest.TestCase):
  26. def test_stop(self):
  27. eventer = MockDispatcher()
  28. h = MockHeart(eventer, interval=1)
  29. h._state = "RUN"
  30. h.stop()
  31. self.assertTrue(h._joined)
  32. h2 = MockHeart(eventer, interval=1)
  33. h2._alive = False
  34. h2._state = "RUN"
  35. h2.stop()
  36. self.assertFalse(h2._joined)
  37. def test_run_manages_cycle(self):
  38. eventer = MockDispatcher()
  39. heart = Heart(eventer, interval=1)
  40. eventer.shutdown_event = heart._shutdown
  41. heart.run()
  42. self.assertEqual(heart._state, "RUN")
  43. self.assertTrue(heart._shutdown.isSet())
  44. self.assertTrue(heart._stopped.isSet())
  45. def test_run(self):
  46. eventer = MockDispatcher()
  47. heart = Heart(eventer, interval=1)
  48. heart._shutdown.set()
  49. heart.run()
  50. self.assertEqual(heart._state, "RUN")
  51. self.assertIn("worker-online", eventer.sent)
  52. self.assertIn("worker-heartbeat", eventer.sent)
  53. self.assertIn("worker-offline", eventer.sent)
  54. self.assertTrue(heart._stopped.isSet())
  55. heart.stop()
  56. heart.stop()
  57. self.assertEqual(heart._state, "CLOSE")
  58. heart = Heart(eventer, interval=0.00001)
  59. heart._shutdown.set()
  60. for i in range(10):
  61. heart.run()
  62. def test_run_stopped_is_set_even_if_send_breaks(self):
  63. eventer = MockDispatcherRaising()
  64. heart = Heart(eventer, interval=1)
  65. heart._shutdown.set()
  66. self.assertRaises(Exception, heart.run)
  67. self.assertTrue(heart._stopped.isSet())