beat.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.apps.beat
  4. ~~~~~~~~~~~~~~~~
  5. This module is the 'program-version' of :mod:`celery.beat`.
  6. It does everything necessary to run that module
  7. as an actual application, like installing signal handlers
  8. and so on.
  9. """
  10. from __future__ import absolute_import
  11. import socket
  12. import sys
  13. from celery import __version__, platforms, beat
  14. from celery.app import app_or_default
  15. from celery.app.abstract import configurated, from_config
  16. from celery.utils.imports import qualname
  17. from celery.utils.log import LOG_LEVELS, get_logger
  18. from celery.utils.timeutils import humanize_seconds
  19. STARTUP_INFO_FMT = """
  20. Configuration ->
  21. . broker -> %(conninfo)s
  22. . loader -> %(loader)s
  23. . scheduler -> %(scheduler)s
  24. %(scheduler_info)s
  25. . logfile -> %(logfile)s@%(loglevel)s
  26. . maxinterval -> %(hmax_interval)s (%(max_interval)ss)
  27. """.strip()
  28. logger = get_logger('celery.beat')
  29. class Beat(configurated):
  30. Service = beat.Service
  31. app = None
  32. loglevel = from_config('log_level')
  33. logfile = from_config('log_file')
  34. schedule = from_config('schedule_filename')
  35. scheduler_cls = from_config('scheduler')
  36. redirect_stdouts = from_config()
  37. redirect_stdouts_level = from_config()
  38. def __init__(self, max_interval=None, app=None,
  39. socket_timeout=30, pidfile=None, **kwargs):
  40. """Starts the celerybeat task scheduler."""
  41. self.app = app = app_or_default(app or self.app)
  42. self.setup_defaults(kwargs, namespace='celerybeat')
  43. self.max_interval = max_interval
  44. self.socket_timeout = socket_timeout
  45. self.colored = app.log.colored(self.logfile)
  46. self.pidfile = pidfile
  47. if not isinstance(self.loglevel, int):
  48. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  49. def run(self):
  50. self.setup_logging()
  51. print(str(self.colored.cyan(
  52. 'celerybeat v%s is starting.' % __version__)))
  53. self.init_loader()
  54. self.set_process_title()
  55. self.start_scheduler()
  56. def setup_logging(self):
  57. handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
  58. logfile=self.logfile)
  59. if self.redirect_stdouts and not handled:
  60. self.app.log.redirect_stdouts_to_logger(logger,
  61. loglevel=self.redirect_stdouts_level)
  62. def start_scheduler(self):
  63. c = self.colored
  64. if self.pidfile:
  65. platforms.create_pidlock(self.pidfile)
  66. beat = self.Service(app=self.app,
  67. max_interval=self.max_interval,
  68. scheduler_cls=self.scheduler_cls,
  69. schedule_filename=self.schedule)
  70. print(str(c.blue('__ ', c.magenta('-'),
  71. c.blue(' ... __ '), c.magenta('-'),
  72. c.blue(' _\n'),
  73. c.reset(self.startup_info(beat)))))
  74. if self.socket_timeout:
  75. logger.debug('Setting default socket timeout to %r',
  76. self.socket_timeout)
  77. socket.setdefaulttimeout(self.socket_timeout)
  78. try:
  79. self.install_sync_handler(beat)
  80. beat.start()
  81. except Exception, exc:
  82. logger.critical('celerybeat raised exception %s: %r',
  83. exc.__class__, exc,
  84. exc_info=True)
  85. def init_loader(self):
  86. # Run the worker init handler.
  87. # (Usually imports task modules and such.)
  88. self.app.loader.init_worker()
  89. self.app.finalize()
  90. def startup_info(self, beat):
  91. scheduler = beat.get_scheduler(lazy=True)
  92. return STARTUP_INFO_FMT % {
  93. 'conninfo': self.app.broker_connection().as_uri(),
  94. 'logfile': self.logfile or '[stderr]',
  95. 'loglevel': LOG_LEVELS[self.loglevel],
  96. 'loader': qualname(self.app.loader),
  97. 'scheduler': qualname(scheduler),
  98. 'scheduler_info': scheduler.info,
  99. 'hmax_interval': humanize_seconds(beat.max_interval),
  100. 'max_interval': beat.max_interval,
  101. }
  102. def set_process_title(self):
  103. arg_start = 'manage' in sys.argv[0] and 2 or 1
  104. platforms.set_process_title('celerybeat',
  105. info=' '.join(sys.argv[arg_start:]))
  106. def install_sync_handler(self, beat):
  107. """Install a `SIGTERM` + `SIGINT` handler that saves
  108. the celerybeat schedule."""
  109. def _sync(signum, frame):
  110. beat.sync()
  111. raise SystemExit()
  112. platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)