snapshot.py 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from celery.utils import timer2
  2. from celery import conf
  3. from celery import log
  4. from celery.datastructures import TokenBucket
  5. from celery.events import EventReceiver
  6. from celery.events.state import State
  7. from celery.messaging import establish_connection
  8. from celery.utils import instantiate
  9. from celery.utils.dispatch import Signal
  10. from celery.utils.timeutils import rate
  11. class Polaroid(object):
  12. shutter_signal = Signal(providing_args=("state", ))
  13. cleanup_signal = Signal()
  14. _tref = None
  15. def __init__(self, state, freq=1.0, maxrate=None,
  16. cleanup_freq=3600.0, logger=None):
  17. self.state = state
  18. self.freq = freq
  19. self.cleanup_freq = cleanup_freq
  20. self.logger = logger or log.get_default_logger(name="celery.cam")
  21. self.maxrate = maxrate and TokenBucket(rate(maxrate))
  22. def install(self):
  23. self._tref = timer2.apply_interval(self.freq * 1000.0,
  24. self.capture)
  25. self._ctref = timer2.apply_interval(self.cleanup_freq * 1000.0,
  26. self.cleanup)
  27. def on_shutter(self, state):
  28. pass
  29. def on_cleanup(self):
  30. pass
  31. def cleanup(self):
  32. self.debug("Cleanup: Running...")
  33. self.cleanup_signal.send(None)
  34. self.on_cleanup()
  35. def debug(self, msg):
  36. if self.logger:
  37. self.logger.debug(msg)
  38. def shutter(self):
  39. if self.maxrate is None or self.maxrate.can_consume():
  40. self.debug("Shutter: %s" % (self.state, ))
  41. self.shutter_signal.send(self.state)
  42. self.on_shutter(self.state)
  43. self.state.clear()
  44. def capture(self):
  45. return self.state.freeze_while(self.shutter)
  46. def cancel(self):
  47. if self._tref:
  48. self._tref()
  49. self._tref.cancel()
  50. if self._ctref:
  51. self._ctref.cancel()
  52. def __enter__(self):
  53. self.install()
  54. return self
  55. def __exit__(self, *exc_info):
  56. self.cancel()
  57. def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
  58. logfile=None):
  59. if not isinstance(loglevel, int):
  60. loglevel = conf.LOG_LEVELS[loglevel.upper()]
  61. logger = log.setup_logger(loglevel=loglevel,
  62. logfile=logfile,
  63. name="celery.evcam")
  64. logger.info(
  65. "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
  66. camera, freq))
  67. state = State()
  68. cam = instantiate(camera, state,
  69. freq=freq, maxrate=maxrate, logger=logger)
  70. cam.install()
  71. conn = establish_connection()
  72. recv = EventReceiver(conn, handlers={"*": state.event})
  73. try:
  74. try:
  75. recv.capture(limit=None)
  76. except KeyboardInterrupt:
  77. raise SystemExit
  78. finally:
  79. cam.cancel()
  80. conn.close()