test_heartbeat.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. from __future__ import absolute_import
  2. from celery.worker.heartbeat import Heart
  3. from celery.tests.utils import Case, sleepdeprived
  4. class MockDispatcher(object):
  5. heart = None
  6. next_iter = 0
  7. def __init__(self):
  8. self.sent = []
  9. self.on_enabled = set()
  10. self.on_disabled = set()
  11. self.enabled = True
  12. def send(self, msg, **_fields):
  13. self.sent.append(msg)
  14. if self.heart:
  15. if self.next_iter > 10:
  16. self.heart._shutdown.set()
  17. self.next_iter += 1
  18. class MockDispatcherRaising(object):
  19. def send(self, msg):
  20. if msg == 'worker-offline':
  21. raise Exception('foo')
  22. class MockTimer(object):
  23. def apply_interval(self, msecs, fun, args=(), kwargs={}):
  24. class entry(tuple):
  25. cancelled = False
  26. def cancel(self):
  27. self.cancelled = True
  28. return entry((msecs, fun, args, kwargs))
  29. def cancel(self, entry):
  30. entry.cancel()
  31. class test_Heart(Case):
  32. def test_stop(self):
  33. timer = MockTimer()
  34. eventer = MockDispatcher()
  35. h = Heart(timer, eventer, interval=1)
  36. h.start()
  37. self.assertTrue(h.tref)
  38. h.stop()
  39. self.assertIsNone(h.tref)
  40. h.stop()
  41. @sleepdeprived
  42. def test_run_manages_cycle(self):
  43. eventer = MockDispatcher()
  44. heart = Heart(MockTimer(), eventer, interval=0.1)
  45. eventer.heart = heart
  46. heart.start()
  47. msecs, fun, args, kwargs = tref = heart.tref
  48. self.assertEqual(msecs, 0.1 * 1000)
  49. self.assertEqual(tref.fun, eventer.send)
  50. self.assertTrue(tref.args)
  51. self.assertTrue(tref.kwargs)
  52. heart.stop()
  53. self.assertTrue(tref.cancelled)