celerybeat.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #!/usr/bin/env python
  2. """celerybeat
  3. .. program:: celerybeat
  4. .. cmdoption:: -s, --schedule
  5. Path to the schedule database. Defaults to ``celerybeat-schedule``.
  6. The extension ".db" will be appended to the filename.
  7. .. cmdoption:: -f, --logfile
  8. Path to log file. If no logfile is specified, ``stderr`` is used.
  9. .. cmdoption:: -l, --loglevel
  10. Logging level, choose between ``DEBUG``, ``INFO``, ``WARNING``,
  11. ``ERROR``, ``CRITICAL``, or ``FATAL``.
  12. """
  13. import sys
  14. import optparse
  15. import traceback
  16. import celery
  17. from celery import conf
  18. from celery import platform
  19. from celery.log import emergency_error
  20. from celery.beat import ClockService
  21. from celery.utils import info
  22. STARTUP_INFO_FMT = """
  23. Configuration ->
  24. . broker -> %(conninfo)s
  25. . schedule -> %(schedule)s
  26. . logfile -> %(logfile)s@%(loglevel)s
  27. """.strip()
  28. OPTION_LIST = (
  29. optparse.make_option('-s', '--schedule',
  30. default=conf.CELERYBEAT_SCHEDULE_FILENAME,
  31. action="store", dest="schedule",
  32. help="Path to the schedule database. The extension \
  33. '.db' will be appended to the filename. Default: %s" % (
  34. conf.CELERYBEAT_SCHEDULE_FILENAME)),
  35. optparse.make_option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
  36. action="store", dest="logfile",
  37. help="Path to log file."),
  38. optparse.make_option('-l', '--loglevel',
  39. default=conf.CELERYBEAT_LOG_LEVEL,
  40. action="store", dest="loglevel",
  41. help="Loglevel. One of DEBUG/INFO/WARNING/ERROR/CRITICAL."),
  42. )
  43. class Beat(object):
  44. ClockService = ClockService
  45. def __init__(self, loglevel=conf.CELERYBEAT_LOG_LEVEL,
  46. logfile=conf.CELERYBEAT_LOG_FILE,
  47. schedule=conf.CELERYBEAT_SCHEDULE_FILENAME, **kwargs):
  48. """Starts the celerybeat task scheduler."""
  49. self.loglevel = loglevel
  50. self.logfile = logfile
  51. self.schedule = schedule
  52. # Setup logging
  53. if not isinstance(self.loglevel, int):
  54. self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
  55. def run(self):
  56. print("celerybeat %s is starting." % celery.__version__)
  57. self.init_loader()
  58. print(self.startup_info())
  59. self.set_process_title()
  60. print("celerybeat has started.")
  61. self.start_scheduler()
  62. def start_scheduler(self):
  63. from celery.log import setup_logger
  64. logger = setup_logger(self.loglevel, self.logfile, name="celery.beat")
  65. beat = self.ClockService(logger,
  66. schedule_filename=self.schedule)
  67. try:
  68. self.install_sync_handler(beat)
  69. beat.start()
  70. except Exception, exc:
  71. emergency_error(self.logfile,
  72. "celerybeat raised exception %s: %s\n%s" % (
  73. exc.__class__, exc, traceback.format_exc()))
  74. def init_loader(self):
  75. # Run the worker init handler.
  76. # (Usually imports task modules and such.)
  77. from celery.loaders import current_loader
  78. current_loader().init_worker()
  79. def startup_info(self):
  80. return STARTUP_INFO_FMT % {
  81. "conninfo": info.format_broker_info(),
  82. "logfile": self.logfile or "@stderr",
  83. "loglevel": conf.LOG_LEVELS[self.loglevel],
  84. "schedule": self.schedule,
  85. }
  86. def set_process_title(self):
  87. arg_start = "manage" in sys.argv[0] and 2 or 1
  88. platform.set_process_title("celerybeat",
  89. info=" ".join(sys.argv[arg_start:]))
  90. def install_sync_handler(self, beat):
  91. """Install a ``SIGTERM`` + ``SIGINT`` handler that saves
  92. the celerybeat schedule."""
  93. def _sync(signum, frame):
  94. beat.sync()
  95. raise SystemExit()
  96. platform.install_signal_handler("SIGTERM", _sync)
  97. platform.install_signal_handler("SIGINT", _sync)
  98. def parse_options(arguments):
  99. """Parse the available options to ``celeryd``."""
  100. parser = optparse.OptionParser(option_list=OPTION_LIST)
  101. options, values = parser.parse_args(arguments)
  102. return options
  103. def run_celerybeat(**options):
  104. Beat(**options).run()
  105. def main():
  106. options = parse_options(sys.argv[1:])
  107. run_celerybeat(**vars(options))
  108. if __name__ == "__main__":
  109. main()