beat.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import atexit
  4. import socket
  5. import sys
  6. from celery import __version__, platforms, beat
  7. from celery.app import app_or_default
  8. from celery.app.abstract import configurated, from_config
  9. from celery.utils.imports import qualname
  10. from celery.utils.log import LOG_LEVELS, get_logger
  11. from celery.utils.timeutils import humanize_seconds
  12. STARTUP_INFO_FMT = """
  13. Configuration ->
  14. . broker -> %(conninfo)s
  15. . loader -> %(loader)s
  16. . scheduler -> %(scheduler)s
  17. %(scheduler_info)s
  18. . logfile -> %(logfile)s@%(loglevel)s
  19. . maxinterval -> %(hmax_interval)s (%(max_interval)ss)
  20. """.strip()
  21. logger = get_logger("celery.beat")
  22. class Beat(configurated):
  23. Service = beat.Service
  24. app = None
  25. loglevel = from_config("log_level")
  26. logfile = from_config("log_file")
  27. schedule = from_config("schedule_filename")
  28. scheduler_cls = from_config("scheduler")
  29. redirect_stdouts = from_config()
  30. redirect_stdouts_level = from_config()
  31. def __init__(self, max_interval=None, app=None,
  32. socket_timeout=30, pidfile=None, **kwargs):
  33. """Starts the celerybeat task scheduler."""
  34. self.app = app = app_or_default(app or self.app)
  35. self.setup_defaults(kwargs, namespace="celerybeat")
  36. self.max_interval = max_interval
  37. self.socket_timeout = socket_timeout
  38. self.colored = app.log.colored(self.logfile)
  39. self.pidfile = pidfile
  40. if not isinstance(self.loglevel, int):
  41. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  42. def run(self):
  43. self.setup_logging()
  44. print(str(self.colored.cyan(
  45. "celerybeat v%s is starting." % __version__)))
  46. self.init_loader()
  47. self.set_process_title()
  48. self.start_scheduler()
  49. def setup_logging(self):
  50. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  51. logfile=self.logfile)
  52. if self.redirect_stdouts and not handled:
  53. self.app.log.redirect_stdouts_to_logger(logger,
  54. loglevel=self.redirect_stdouts_level)
  55. def start_scheduler(self):
  56. c = self.colored
  57. if self.pidfile:
  58. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  59. atexit.register(pidlock.release)
  60. beat = self.Service(app=self.app,
  61. max_interval=self.max_interval,
  62. scheduler_cls=self.scheduler_cls,
  63. schedule_filename=self.schedule)
  64. print(str(c.blue("__ ", c.magenta("-"),
  65. c.blue(" ... __ "), c.magenta("-"),
  66. c.blue(" _\n"),
  67. c.reset(self.startup_info(beat)))))
  68. if self.socket_timeout:
  69. logger.debug("Setting default socket timeout to %r",
  70. self.socket_timeout)
  71. socket.setdefaulttimeout(self.socket_timeout)
  72. try:
  73. self.install_sync_handler(beat)
  74. beat.start()
  75. except Exception, exc:
  76. logger.critical("celerybeat raised exception %s: %r",
  77. exc.__class__, exc,
  78. exc_info=True)
  79. def init_loader(self):
  80. # Run the worker init handler.
  81. # (Usually imports task modules and such.)
  82. self.app.loader.init_worker()
  83. self.app.finalize()
  84. def startup_info(self, beat):
  85. scheduler = beat.get_scheduler(lazy=True)
  86. return STARTUP_INFO_FMT % {
  87. "conninfo": self.app.broker_connection().as_uri(),
  88. "logfile": self.logfile or "[stderr]",
  89. "loglevel": LOG_LEVELS[self.loglevel],
  90. "loader": qualname(self.app.loader),
  91. "scheduler": qualname(scheduler),
  92. "scheduler_info": scheduler.info,
  93. "hmax_interval": humanize_seconds(beat.max_interval),
  94. "max_interval": beat.max_interval,
  95. }
  96. def set_process_title(self):
  97. arg_start = "manage" in sys.argv[0] and 2 or 1
  98. platforms.set_process_title("celerybeat",
  99. info=" ".join(sys.argv[arg_start:]))
  100. def install_sync_handler(self, beat):
  101. """Install a `SIGTERM` + `SIGINT` handler that saves
  102. the celerybeat schedule."""
  103. def _sync(signum, frame):
  104. beat.sync()
  105. raise SystemExit()
  106. platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)