beat.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import atexit
  4. import socket
  5. import sys
  6. import traceback
  7. from .. import __version__, platforms
  8. from .. import beat
  9. from ..app import app_or_default
  10. from ..utils import get_full_cls_name, LOG_LEVELS
  11. from ..utils.timeutils import humanize_seconds
  12. STARTUP_INFO_FMT = """
  13. Configuration ->
  14. . broker -> %(conninfo)s
  15. . loader -> %(loader)s
  16. . scheduler -> %(scheduler)s
  17. %(scheduler_info)s
  18. . logfile -> %(logfile)s@%(loglevel)s
  19. . maxinterval -> %(hmax_interval)s (%(max_interval)ss)
  20. """.strip()
  21. class Beat(object):
  22. Service = beat.Service
  23. def __init__(self, loglevel=None, logfile=None, schedule=None,
  24. max_interval=None, scheduler_cls=None, app=None,
  25. socket_timeout=30, redirect_stdouts=None,
  26. redirect_stdouts_level=None, pidfile=None, **kwargs):
  27. """Starts the celerybeat task scheduler."""
  28. self.app = app = app_or_default(app)
  29. self.loglevel = loglevel or app.conf.CELERYBEAT_LOG_LEVEL
  30. self.logfile = logfile or app.conf.CELERYBEAT_LOG_FILE
  31. self.schedule = schedule or app.conf.CELERYBEAT_SCHEDULE_FILENAME
  32. self.scheduler_cls = scheduler_cls or app.conf.CELERYBEAT_SCHEDULER
  33. self.max_interval = max_interval
  34. self.socket_timeout = socket_timeout
  35. self.colored = app.log.colored(self.logfile)
  36. self.redirect_stdouts = (redirect_stdouts or
  37. app.conf.CELERY_REDIRECT_STDOUTS)
  38. self.redirect_stdouts_level = (redirect_stdouts_level or
  39. app.conf.CELERY_REDIRECT_STDOUTS_LEVEL)
  40. self.pidfile = pidfile
  41. if not isinstance(self.loglevel, int):
  42. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  43. def run(self):
  44. logger = self.setup_logging()
  45. print(str(self.colored.cyan(
  46. "celerybeat v%s is starting." % __version__)))
  47. self.init_loader()
  48. self.set_process_title()
  49. self.start_scheduler(logger)
  50. def setup_logging(self):
  51. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  52. logfile=self.logfile)
  53. logger = self.app.log.get_default_logger(name="celery.beat")
  54. if self.redirect_stdouts and not handled:
  55. self.app.log.redirect_stdouts_to_logger(logger,
  56. loglevel=self.redirect_stdouts_level)
  57. return logger
  58. def start_scheduler(self, logger=None):
  59. c = self.colored
  60. if self.pidfile:
  61. pidlock = platforms.create_pidlock(self.pidfile).acquire()
  62. atexit.register(pidlock.release)
  63. beat = self.Service(app=self.app,
  64. logger=logger,
  65. max_interval=self.max_interval,
  66. scheduler_cls=self.scheduler_cls,
  67. schedule_filename=self.schedule)
  68. print(str(c.blue("__ ", c.magenta("-"),
  69. c.blue(" ... __ "), c.magenta("-"),
  70. c.blue(" _\n"),
  71. c.reset(self.startup_info(beat)))))
  72. if self.socket_timeout:
  73. logger.debug("Setting default socket timeout to %r",
  74. self.socket_timeout)
  75. socket.setdefaulttimeout(self.socket_timeout)
  76. try:
  77. self.install_sync_handler(beat)
  78. beat.start()
  79. except Exception, exc:
  80. logger.critical("celerybeat raised exception %s: %r\n%s",
  81. exc.__class__, exc, traceback.format_exc(),
  82. exc_info=sys.exc_info())
  83. def init_loader(self):
  84. # Run the worker init handler.
  85. # (Usually imports task modules and such.)
  86. self.app.loader.init_worker()
  87. def startup_info(self, beat):
  88. scheduler = beat.get_scheduler(lazy=True)
  89. return STARTUP_INFO_FMT % {
  90. "conninfo": self.app.broker_connection().as_uri(),
  91. "logfile": self.logfile or "[stderr]",
  92. "loglevel": LOG_LEVELS[self.loglevel],
  93. "loader": get_full_cls_name(self.app.loader.__class__),
  94. "scheduler": get_full_cls_name(scheduler.__class__),
  95. "scheduler_info": scheduler.info,
  96. "hmax_interval": humanize_seconds(beat.max_interval),
  97. "max_interval": beat.max_interval,
  98. }
  99. def set_process_title(self):
  100. arg_start = "manage" in sys.argv[0] and 2 or 1
  101. platforms.set_process_title("celerybeat",
  102. info=" ".join(sys.argv[arg_start:]))
  103. def install_sync_handler(self, beat):
  104. """Install a `SIGTERM` + `SIGINT` handler that saves
  105. the celerybeat schedule."""
  106. def _sync(signum, frame):
  107. beat.sync()
  108. raise SystemExit()
  109. platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)