test_worker_heartbeat.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from celery.worker.heartbeat import Heart
  2. from celery.tests.utils import unittest, sleepdeprived
  3. class MockDispatcher(object):
  4. heart = None
  5. next_iter = 0
  6. def __init__(self):
  7. self.sent = []
  8. def send(self, msg, **_fields):
  9. self.sent.append(msg)
  10. if self.heart:
  11. if self.next_iter > 10:
  12. self.heart._shutdown.set()
  13. self.next_iter += 1
  14. class MockDispatcherRaising(object):
  15. def send(self, msg):
  16. if msg == "worker-offline":
  17. raise Exception("foo")
  18. class MockHeart(Heart):
  19. _alive = True
  20. _joined = False
  21. def isAlive(self):
  22. return self._alive
  23. def join(self, timeout=None):
  24. self._joined = True
  25. class TestHeart(unittest.TestCase):
  26. def test_stop(self):
  27. eventer = MockDispatcher()
  28. h = MockHeart(eventer, interval=1)
  29. h._state = "RUN"
  30. h.stop()
  31. self.assertTrue(h._joined)
  32. h2 = MockHeart(eventer, interval=1)
  33. h2._alive = False
  34. h2._state = "RUN"
  35. h2.stop()
  36. self.assertFalse(h2._joined)
  37. def test_time_raises_TypeError(self):
  38. from celery.worker import heartbeat
  39. def raises_TypeError(exc):
  40. raise TypeError("x")
  41. prev_time, heartbeat.time = heartbeat.time, raises_TypeError
  42. try:
  43. eventer = MockDispatcher()
  44. heart = Heart(eventer, interval=0.1)
  45. heart.run()
  46. self.assertIn("worker-online", eventer.sent)
  47. self.assertNotIn("worker-heartbeat", eventer.sent)
  48. finally:
  49. heartbeat.time = prev_time
  50. @sleepdeprived
  51. def test_run_manages_cycle(self):
  52. eventer = MockDispatcher()
  53. heart = Heart(eventer, interval=0.1)
  54. eventer.heart = heart
  55. heart.run()
  56. self.assertEqual(heart._state, "RUN")
  57. self.assertTrue(heart._shutdown.isSet())
  58. heart._shutdown.clear()
  59. heart._stopped.clear()
  60. eventer.next_iter = 0
  61. heart.run()
  62. def test_run(self):
  63. eventer = MockDispatcher()
  64. heart = Heart(eventer, interval=1)
  65. heart._shutdown.set()
  66. heart.run()
  67. self.assertEqual(heart._state, "RUN")
  68. self.assertIn("worker-online", eventer.sent)
  69. self.assertIn("worker-heartbeat", eventer.sent)
  70. self.assertIn("worker-offline", eventer.sent)
  71. heart.stop()
  72. heart.stop()
  73. self.assertEqual(heart._state, "CLOSE")
  74. heart = Heart(eventer, interval=0.00001)
  75. heart._shutdown.set()
  76. for i in range(10):
  77. heart.run()
  78. def test_run_exception(self):
  79. eventer = MockDispatcherRaising()
  80. heart = Heart(eventer, interval=1)
  81. heart._shutdown.set()
  82. self.assertRaises(Exception, heart.run)