beat.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import socket
  2. import sys
  3. import traceback
  4. from celery import __version__
  5. from celery import beat
  6. from celery import platforms
  7. from celery.log import emergency_error
  8. from celery.utils import get_full_cls_name, info, LOG_LEVELS
  9. from celery.utils.info import humanize_seconds
  10. from celery.utils import term
  11. STARTUP_INFO_FMT = """
  12. Configuration ->
  13. . broker -> %(conninfo)s
  14. . loader -> %(loader)s
  15. . scheduler -> %(scheduler)s
  16. %(scheduler_info)s
  17. . logfile -> %(logfile)s@%(loglevel)s
  18. . maxinterval -> %(hmax_interval)s (%(max_interval)ss)
  19. """.strip()
  20. class Beat(object):
  21. Service = beat.Service
  22. def __init__(self, loglevel=None, logfile=None, schedule=None,
  23. max_interval=None, scheduler_cls=None, defaults=None,
  24. socket_timeout=30, redirect_stdouts=None,
  25. redirect_stdouts_level=None, **kwargs):
  26. """Starts the celerybeat task scheduler."""
  27. if defaults is None:
  28. from celery import conf as defaults
  29. self.defaults = defaults
  30. self.loglevel = loglevel or defaults.CELERYBEAT_LOG_LEVEL
  31. self.logfile = logfile or defaults.CELERYBEAT_LOG_FILE
  32. self.schedule = schedule or defaults.CELERYBEAT_SCHEDULE_FILENAME
  33. self.scheduler_cls = scheduler_cls or defaults.CELERYBEAT_SCHEDULER
  34. self.max_interval = max_interval
  35. self.socket_timeout = socket_timeout
  36. self.colored = term.colored(enabled=defaults.CELERYD_LOG_COLOR)
  37. self.redirect_stdouts = redirect_stdouts or defaults.REDIRECT_STDOUTS
  38. self.redirect_stdouts_level = (redirect_stdouts_level or
  39. defaults.REDIRECT_STDOUTS_LEVEL)
  40. if not isinstance(self.loglevel, int):
  41. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  42. def run(self):
  43. logger = self.setup_logging()
  44. print(str(self.colored.cyan(
  45. "celerybeat v%s is starting." % __version__)))
  46. self.init_loader()
  47. self.set_process_title()
  48. self.start_scheduler(logger)
  49. def setup_logging(self):
  50. from celery import log
  51. handled = log.setup_logging_subsystem(loglevel=self.loglevel,
  52. logfile=self.logfile)
  53. if not handled:
  54. logger = log.get_default_logger(name="celery.beat")
  55. if self.redirect_stdouts:
  56. log.redirect_stdouts_to_logger(logger,
  57. loglevel=self.redirect_stdouts_level)
  58. return logger
  59. def start_scheduler(self, logger=None):
  60. c = self.colored
  61. beat = self.Service(logger=logger,
  62. max_interval=self.max_interval,
  63. scheduler_cls=self.scheduler_cls,
  64. schedule_filename=self.schedule)
  65. print(str(c.blue("__ ", c.magenta("-"),
  66. c.blue(" ... __ "), c.magenta("-"),
  67. c.blue(" _\n"),
  68. c.reset(self.startup_info(beat)))))
  69. if self.socket_timeout:
  70. logger.debug("Setting default socket timeout to %r" % (
  71. self.socket_timeout))
  72. socket.setdefaulttimeout(self.socket_timeout)
  73. try:
  74. self.install_sync_handler(beat)
  75. beat.start()
  76. except Exception, exc:
  77. logger.critical("celerybeat raised exception: %r\n%s" % (
  78. exc, traceback.format_exc()),
  79. exc_info=sys.exc_info())
  80. def init_loader(self):
  81. # Run the worker init handler.
  82. # (Usually imports task modules and such.)
  83. from celery.loaders import current_loader
  84. self.loader = current_loader()
  85. self.loader.init_worker()
  86. def startup_info(self, beat):
  87. scheduler = beat.get_scheduler(lazy=True)
  88. return STARTUP_INFO_FMT % {
  89. "conninfo": info.format_broker_info(),
  90. "logfile": self.logfile or "[stderr]",
  91. "loglevel": LOG_LEVELS[self.loglevel],
  92. "loader": get_full_cls_name(self.loader.__class__),
  93. "scheduler": get_full_cls_name(scheduler.__class__),
  94. "scheduler_info": scheduler.info,
  95. "hmax_interval": humanize_seconds(beat.max_interval),
  96. "max_interval": beat.max_interval,
  97. }
  98. def set_process_title(self):
  99. arg_start = "manage" in sys.argv[0] and 2 or 1
  100. platforms.set_process_title("celerybeat",
  101. info=" ".join(sys.argv[arg_start:]))
  102. def install_sync_handler(self, beat):
  103. """Install a ``SIGTERM`` + ``SIGINT`` handler that saves
  104. the celerybeat schedule."""
  105. def _sync(signum, frame):
  106. beat.sync()
  107. raise SystemExit()
  108. platforms.install_signal_handler("SIGTERM", _sync)
  109. platforms.install_signal_handler("SIGINT", _sync)
  110. def run_celerybeat(*args, **kwargs):
  111. return Beat(*args, **kwargs).run()