test_heartbeat.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from case import Mock
  2. from celery.worker.heartbeat import Heart
  3. class MockDispatcher:
  4. heart = None
  5. next_iter = 0
  6. def __init__(self):
  7. self.sent = []
  8. self.on_enabled = set()
  9. self.on_disabled = set()
  10. self.enabled = True
  11. def send(self, msg, **_fields):
  12. self.sent.append(msg)
  13. if self.heart:
  14. if self.next_iter > 10:
  15. self.heart._shutdown.set()
  16. self.next_iter += 1
  17. class MockTimer:
  18. def call_repeatedly(self, secs, fun, args=(), kwargs={}):
  19. class entry(tuple):
  20. canceled = False
  21. def cancel(self):
  22. self.canceled = True
  23. return entry((secs, fun, args, kwargs))
  24. def cancel(self, entry):
  25. entry.cancel()
  26. class test_Heart:
  27. def test_start_stop(self):
  28. timer = MockTimer()
  29. eventer = MockDispatcher()
  30. h = Heart(timer, eventer, interval=1)
  31. h.start()
  32. assert h.tref
  33. h.stop()
  34. assert h.tref is None
  35. h.stop()
  36. def test_send_sends_signal(self):
  37. h = Heart(MockTimer(), MockDispatcher(), interval=1)
  38. h._send_sent_signal = None
  39. h._send('worker-heartbeat')
  40. h._send_sent_signal = Mock(name='send_sent_signal')
  41. h._send('worker')
  42. h._send_sent_signal.assert_called_with(sender=h)
  43. def test_start_when_disabled(self):
  44. timer = MockTimer()
  45. eventer = MockDispatcher()
  46. eventer.enabled = False
  47. h = Heart(timer, eventer)
  48. h.start()
  49. assert not h.tref
  50. def test_stop_when_disabled(self):
  51. timer = MockTimer()
  52. eventer = MockDispatcher()
  53. eventer.enabled = False
  54. h = Heart(timer, eventer)
  55. h.stop()