snapshot.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.events.snapshot
  4. ~~~~~~~~~~~~~~~~~~~~~~
  5. Consuming the events as a stream is not always suitable
  6. so this module implements a system to take snapshots of the
  7. state of a cluster at regular intervals. There is a full
  8. implementation of this writing the snapshots to a database
  9. in :mod:`djcelery.snapshots` in the `django-celery` distribution.
  10. :copyright: (c) 2009 - 2012 by Ask Solem.
  11. :license: BSD, see LICENSE for more details.
  12. """
  13. from __future__ import absolute_import
  14. import atexit
  15. from kombu.utils.limits import TokenBucket
  16. from celery import platforms
  17. from celery.app import app_or_default
  18. from celery.utils import timer2
  19. from celery.utils.dispatch import Signal
  20. from celery.utils.imports import instantiate
  21. from celery.utils.log import get_logger
  22. from celery.utils.timeutils import rate
  23. logger = get_logger("celery.evcam")
  24. class Polaroid(object):
  25. timer = timer2
  26. shutter_signal = Signal(providing_args=("state", ))
  27. cleanup_signal = Signal()
  28. clear_after = False
  29. _tref = None
  30. _ctref = None
  31. def __init__(self, state, freq=1.0, maxrate=None,
  32. cleanup_freq=3600.0, timer=None, app=None):
  33. self.app = app_or_default(app)
  34. self.state = state
  35. self.freq = freq
  36. self.cleanup_freq = cleanup_freq
  37. self.timer = timer or self.timer
  38. self.logger = logger
  39. self.maxrate = maxrate and TokenBucket(rate(maxrate))
  40. def install(self):
  41. self._tref = self.timer.apply_interval(self.freq * 1000.0,
  42. self.capture)
  43. self._ctref = self.timer.apply_interval(self.cleanup_freq * 1000.0,
  44. self.cleanup)
  45. def on_shutter(self, state):
  46. pass
  47. def on_cleanup(self):
  48. pass
  49. def cleanup(self):
  50. logger.debug("Cleanup: Running...")
  51. self.cleanup_signal.send(None)
  52. self.on_cleanup()
  53. def shutter(self):
  54. if self.maxrate is None or self.maxrate.can_consume():
  55. logger.debug("Shutter: %s", self.state)
  56. self.shutter_signal.send(self.state)
  57. self.on_shutter(self.state)
  58. def capture(self):
  59. self.state.freeze_while(self.shutter, clear_after=self.clear_after)
  60. def cancel(self):
  61. if self._tref:
  62. self._tref() # flush all received events.
  63. self._tref.cancel()
  64. if self._ctref:
  65. self._ctref.cancel()
  66. def __enter__(self):
  67. self.install()
  68. return self
  69. def __exit__(self, *exc_info):
  70. self.cancel()
  71. def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
  72. logfile=None, pidfile=None, timer=None, app=None):
  73. app = app_or_default(app)
  74. if pidfile:
  75. platforms.create_pidlock(pidfile)
  76. app.log.setup_logging_subsystem(loglevel, logfile)
  77. logger.info(
  78. "-> evcam: Taking snapshots with %s (every %s secs.)\n" % (
  79. camera, freq))
  80. state = app.events.State()
  81. cam = instantiate(camera, state, app=app, freq=freq,
  82. maxrate=maxrate, timer=timer)
  83. cam.install()
  84. conn = app.broker_connection()
  85. recv = app.events.Receiver(conn, handlers={"*": state.event})
  86. try:
  87. try:
  88. recv.capture(limit=None)
  89. except KeyboardInterrupt:
  90. raise SystemExit
  91. finally:
  92. cam.cancel()
  93. conn.close()