beat.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. import logging
  2. import sys
  3. import traceback
  4. from celery import __version__
  5. from celery import beat
  6. from celery import platform
  7. from celery.app import app_or_default
  8. from celery.log import emergency_error
  9. from celery.utils import info, LOG_LEVELS
  10. STARTUP_INFO_FMT = """
  11. Configuration ->
  12. . broker -> %(conninfo)s
  13. . schedule -> %(schedule)s
  14. . logfile -> %(logfile)s@%(loglevel)s
  15. """.strip()
  16. class Beat(object):
  17. Service = beat.Service
  18. def __init__(self, loglevel=None, logfile=None, schedule=None,
  19. max_interval=None, scheduler_cls=None, app=None, **kwargs):
  20. """Starts the celerybeat task scheduler."""
  21. self.app = app = app_or_default(app)
  22. self.loglevel = loglevel or app.conf.CELERYBEAT_LOG_LEVEL
  23. self.logfile = logfile or app.conf.CELERYBEAT_LOG_FILE
  24. self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
  25. self.scheduler_cls = scheduler_cls
  26. self.max_interval = max_interval
  27. if not isinstance(self.loglevel, int):
  28. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  29. def run(self):
  30. logger = self.setup_logging()
  31. print("celerybeat v%s is starting." % __version__)
  32. self.init_loader()
  33. print(self.startup_info())
  34. self.set_process_title()
  35. print("celerybeat has started.")
  36. self.start_scheduler(logger)
  37. def setup_logging(self):
  38. from celery import log
  39. handled = log.setup_logging_subsystem(loglevel=self.loglevel,
  40. logfile=self.logfile,
  41. app=self.app)
  42. if not handled:
  43. logger = log.get_default_logger(name="celery.beat")
  44. log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
  45. return logger
  46. def start_scheduler(self, logger=None):
  47. beat = self.Service(app=self.app,
  48. logger=logger,
  49. max_interval=self.max_interval,
  50. scheduler_cls=self.scheduler_cls,
  51. schedule_filename=self.schedule)
  52. try:
  53. self.install_sync_handler(beat)
  54. beat.start()
  55. except Exception, exc:
  56. emergency_error(self.logfile,
  57. "celerybeat raised exception %s: %s\n%s" % (
  58. exc.__class__, exc, traceback.format_exc()))
  59. def init_loader(self):
  60. # Run the worker init handler.
  61. # (Usually imports task modules and such.)
  62. from celery.loaders import current_loader
  63. current_loader().init_worker()
  64. def startup_info(self):
  65. return STARTUP_INFO_FMT % {
  66. "conninfo": info.format_broker_info(app=self.app),
  67. "logfile": self.logfile or "@stderr",
  68. "loglevel": LOG_LEVELS[self.loglevel],
  69. "schedule": self.schedule,
  70. }
  71. def set_process_title(self):
  72. arg_start = "manage" in sys.argv[0] and 2 or 1
  73. platform.set_process_title("celerybeat",
  74. info=" ".join(sys.argv[arg_start:]))
  75. def install_sync_handler(self, beat):
  76. """Install a ``SIGTERM`` + ``SIGINT`` handler that saves
  77. the celerybeat schedule."""
  78. def _sync(signum, frame):
  79. beat.sync()
  80. raise SystemExit()
  81. platform.install_signal_handler("SIGTERM", _sync)
  82. platform.install_signal_handler("SIGINT", _sync)
  83. def run_celerybeat(*args, **kwargs):
  84. return Beat(*args, **kwargs).run()