snapshot.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. """
  2. celery.events.snapshot
  3. ======================
  4. Consuming the events as a stream is not always suitable,
  5. so this module implements a system to take snapshots of the
  6. state of a cluster. There is a full implementation of this
  7. writing the snapshots to a database in ``django-celery``.
  8. """
  9. from __future__ import absolute_import
  10. import atexit
  11. from .. import platforms
  12. from ..app import app_or_default
  13. from ..datastructures import TokenBucket
  14. from ..utils import timer2, instantiate, LOG_LEVELS
  15. from ..utils.dispatch import Signal
  16. from ..utils.timeutils import rate
  17. __all__ = ["Polaroid", "evcam"]
  18. class Polaroid(object):
  19. timer = timer2
  20. shutter_signal = Signal(providing_args=("state", ))
  21. cleanup_signal = Signal()
  22. clear_after = False
  23. _tref = None
  24. _ctref = None
  25. def __init__(self, state, freq=1.0, maxrate=None,
  26. cleanup_freq=3600.0, logger=None, timer=None, app=None):
  27. self.app = app_or_default(app)
  28. self.state = state
  29. self.freq = freq
  30. self.cleanup_freq = cleanup_freq
  31. self.timer = timer or self.timer
  32. self.logger = logger or \
  33. self.app.log.get_default_logger(name="celery.cam")
  34. self.maxrate = maxrate and TokenBucket(rate(maxrate))
  35. def install(self):
  36. self._tref = self.timer.apply_interval(self.freq * 1000.0,
  37. self.capture)
  38. self._ctref = self.timer.apply_interval(self.cleanup_freq * 1000.0,
  39. self.cleanup)
  40. def on_shutter(self, state):
  41. pass
  42. def on_cleanup(self):
  43. pass
  44. def cleanup(self):
  45. self.logger.debug("Cleanup: Running...")
  46. self.cleanup_signal.send(None)
  47. self.on_cleanup()
  48. def shutter(self):
  49. if self.maxrate is None or self.maxrate.can_consume():
  50. self.logger.debug("Shutter: %s", self.state)
  51. self.shutter_signal.send(self.state)
  52. self.on_shutter(self.state)
  53. def capture(self):
  54. self.state.freeze_while(self.shutter, clear_after=self.clear_after)
  55. def cancel(self):
  56. if self._tref:
  57. self._tref() # flush all received events.
  58. self._tref.cancel()
  59. if self._ctref:
  60. self._ctref.cancel()
  61. def __enter__(self):
  62. self.install()
  63. return self
  64. def __exit__(self, *exc_info):
  65. self.cancel()
  66. def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
  67. logfile=None, pidfile=None, timer=None, app=None):
  68. app = app_or_default(app)
  69. if pidfile:
  70. pidlock = platforms.create_pidlock(pidfile).acquire()
  71. atexit.register(pidlock.release)
  72. if not isinstance(loglevel, int):
  73. loglevel = LOG_LEVELS[loglevel.upper()]
  74. logger = app.log.setup_logger(loglevel=loglevel,
  75. logfile=logfile,
  76. name="celery.evcam")
  77. logger.info(
  78. "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
  79. camera, freq))
  80. state = app.events.State()
  81. cam = instantiate(camera, state, app=app,
  82. freq=freq, maxrate=maxrate, logger=logger,
  83. timer=timer)
  84. cam.install()
  85. conn = app.broker_connection()
  86. recv = app.events.Receiver(conn, handlers={"*": state.event})
  87. try:
  88. try:
  89. recv.capture(limit=None)
  90. except KeyboardInterrupt:
  91. raise SystemExit
  92. finally:
  93. cam.cancel()
  94. conn.close()