test_heartbeat.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. from __future__ import absolute_import, unicode_literals
  2. from case import Mock
  3. from celery.worker.heartbeat import Heart
  4. class MockDispatcher(object):
  5. heart = None
  6. next_iter = 0
  7. def __init__(self):
  8. self.sent = []
  9. self.on_enabled = set()
  10. self.on_disabled = set()
  11. self.enabled = True
  12. def send(self, msg, **_fields):
  13. self.sent.append(msg)
  14. if self.heart:
  15. if self.next_iter > 10:
  16. self.heart._shutdown.set()
  17. self.next_iter += 1
  18. class MockTimer(object):
  19. def call_repeatedly(self, secs, fun, args=(), kwargs={}):
  20. class entry(tuple):
  21. canceled = False
  22. def cancel(self):
  23. self.canceled = True
  24. return entry((secs, fun, args, kwargs))
  25. def cancel(self, entry):
  26. entry.cancel()
  27. class test_Heart:
  28. def test_start_stop(self):
  29. timer = MockTimer()
  30. eventer = MockDispatcher()
  31. h = Heart(timer, eventer, interval=1)
  32. h.start()
  33. assert h.tref
  34. h.stop()
  35. assert h.tref is None
  36. h.stop()
  37. def test_send_sends_signal(self):
  38. h = Heart(MockTimer(), MockDispatcher(), interval=1)
  39. h._send_sent_signal = None
  40. h._send('worker-heartbeat')
  41. h._send_sent_signal = Mock(name='send_sent_signal')
  42. h._send('worker')
  43. h._send_sent_signal.assert_called_with(sender=h)
  44. def test_start_when_disabled(self):
  45. timer = MockTimer()
  46. eventer = MockDispatcher()
  47. eventer.enabled = False
  48. h = Heart(timer, eventer)
  49. h.start()
  50. assert not h.tref
  51. def test_stop_when_disabled(self):
  52. timer = MockTimer()
  53. eventer = MockDispatcher()
  54. eventer.enabled = False
  55. h = Heart(timer, eventer)
  56. h.stop()