beat.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. # -*- coding: utf-8 -*-
  2. """Beat command-line program.
  3. This module is the 'program-version' of :mod:`celery.beat`.
  4. It does everything necessary to run that module
  5. as an actual application, like installing signal handlers
  6. and so on.
  7. """
  8. from __future__ import absolute_import, print_function, unicode_literals
  9. import numbers
  10. import socket
  11. import sys
  12. from datetime import datetime
  13. from celery import VERSION_BANNER, platforms, beat
  14. from celery.five import text_t
  15. from celery.utils.imports import qualname
  16. from celery.utils.log import LOG_LEVELS, get_logger
  17. from celery.utils.time import humanize_seconds
  18. __all__ = ['Beat']
  19. STARTUP_INFO_FMT = """
  20. LocalTime -> {timestamp}
  21. Configuration ->
  22. . broker -> {conninfo}
  23. . loader -> {loader}
  24. . scheduler -> {scheduler}
  25. {scheduler_info}
  26. . logfile -> {logfile}@%{loglevel}
  27. . maxinterval -> {hmax_interval} ({max_interval}s)
  28. """.strip()
  29. logger = get_logger('celery.beat')
  30. class Beat(object):
  31. """Beat as a service."""
  32. Service = beat.Service
  33. app = None
  34. def __init__(self, max_interval=None, app=None,
  35. socket_timeout=30, pidfile=None, no_color=None,
  36. loglevel='WARN', logfile=None, schedule=None,
  37. scheduler=None,
  38. scheduler_cls=None, # XXX use scheduler
  39. redirect_stdouts=None,
  40. redirect_stdouts_level=None, **kwargs):
  41. self.app = app = app or self.app
  42. either = self.app.either
  43. self.loglevel = loglevel
  44. self.logfile = logfile
  45. self.schedule = either('beat_schedule_filename', schedule)
  46. self.scheduler_cls = either(
  47. 'beat_scheduler', scheduler, scheduler_cls)
  48. self.redirect_stdouts = either(
  49. 'worker_redirect_stdouts', redirect_stdouts)
  50. self.redirect_stdouts_level = either(
  51. 'worker_redirect_stdouts_level', redirect_stdouts_level)
  52. self.max_interval = max_interval
  53. self.socket_timeout = socket_timeout
  54. self.no_color = no_color
  55. self.colored = app.log.colored(
  56. self.logfile,
  57. enabled=not no_color if no_color is not None else no_color,
  58. )
  59. self.pidfile = pidfile
  60. if not isinstance(self.loglevel, numbers.Integral):
  61. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  62. def run(self):
  63. print(str(self.colored.cyan(
  64. 'celery beat v{0} is starting.'.format(VERSION_BANNER))))
  65. self.init_loader()
  66. self.set_process_title()
  67. self.start_scheduler()
  68. def setup_logging(self, colorize=None):
  69. if colorize is None and self.no_color is not None:
  70. colorize = not self.no_color
  71. self.app.log.setup(self.loglevel, self.logfile,
  72. self.redirect_stdouts, self.redirect_stdouts_level,
  73. colorize=colorize)
  74. def start_scheduler(self):
  75. if self.pidfile:
  76. platforms.create_pidlock(self.pidfile)
  77. service = self.Service(
  78. app=self.app,
  79. max_interval=self.max_interval,
  80. scheduler_cls=self.scheduler_cls,
  81. schedule_filename=self.schedule,
  82. )
  83. print(self.banner(service))
  84. self.setup_logging()
  85. if self.socket_timeout:
  86. logger.debug('Setting default socket timeout to %r',
  87. self.socket_timeout)
  88. socket.setdefaulttimeout(self.socket_timeout)
  89. try:
  90. self.install_sync_handler(service)
  91. service.start()
  92. except Exception as exc:
  93. logger.critical('beat raised exception %s: %r',
  94. exc.__class__, exc,
  95. exc_info=True)
  96. raise
  97. def banner(self, service):
  98. c = self.colored
  99. return text_t( # flake8: noqa
  100. c.blue('__ ', c.magenta('-'),
  101. c.blue(' ... __ '), c.magenta('-'),
  102. c.blue(' _\n'),
  103. c.reset(self.startup_info(service))),
  104. )
  105. def init_loader(self):
  106. # Run the worker init handler.
  107. # (Usually imports task modules and such.)
  108. self.app.loader.init_worker()
  109. self.app.finalize()
  110. def startup_info(self, service):
  111. scheduler = service.get_scheduler(lazy=True)
  112. return STARTUP_INFO_FMT.format(
  113. conninfo=self.app.connection().as_uri(),
  114. timestamp=datetime.now().replace(microsecond=0),
  115. logfile=self.logfile or '[stderr]',
  116. loglevel=LOG_LEVELS[self.loglevel],
  117. loader=qualname(self.app.loader),
  118. scheduler=qualname(scheduler),
  119. scheduler_info=scheduler.info,
  120. hmax_interval=humanize_seconds(scheduler.max_interval),
  121. max_interval=scheduler.max_interval,
  122. )
  123. def set_process_title(self):
  124. arg_start = 'manage' in sys.argv[0] and 2 or 1
  125. platforms.set_process_title(
  126. 'celery beat', info=' '.join(sys.argv[arg_start:]),
  127. )
  128. def install_sync_handler(self, service):
  129. """Install a `SIGTERM` + `SIGINT` handler saving the schedule."""
  130. def _sync(signum, frame):
  131. service.sync()
  132. raise SystemExit()
  133. platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)