123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- # -*- coding: utf-8 -*-
- """Beat command-line program.
- This module is the 'program-version' of :mod:`celery.beat`.
- It does everything necessary to run that module
- as an actual application, like installing signal handlers
- and so on.
- """
- from __future__ import absolute_import, print_function, unicode_literals
- import numbers
- import socket
- import sys
- from datetime import datetime
- from celery import VERSION_BANNER, platforms, beat
- from celery.five import text_t
- from celery.utils.imports import qualname
- from celery.utils.log import LOG_LEVELS, get_logger
- from celery.utils.time import humanize_seconds
- __all__ = ['Beat']
- STARTUP_INFO_FMT = """
- LocalTime -> {timestamp}
- Configuration ->
- . broker -> {conninfo}
- . loader -> {loader}
- . scheduler -> {scheduler}
- {scheduler_info}
- . logfile -> {logfile}@%{loglevel}
- . maxinterval -> {hmax_interval} ({max_interval}s)
- """.strip()
- logger = get_logger('celery.beat')
- class Beat(object):
- """Beat as a service."""
- Service = beat.Service
- app = None
- def __init__(self, max_interval=None, app=None,
- socket_timeout=30, pidfile=None, no_color=None,
- loglevel='WARN', logfile=None, schedule=None,
- scheduler=None,
- scheduler_cls=None, # XXX use scheduler
- redirect_stdouts=None,
- redirect_stdouts_level=None, **kwargs):
- self.app = app = app or self.app
- either = self.app.either
- self.loglevel = loglevel
- self.logfile = logfile
- self.schedule = either('beat_schedule_filename', schedule)
- self.scheduler_cls = either(
- 'beat_scheduler', scheduler, scheduler_cls)
- self.redirect_stdouts = either(
- 'worker_redirect_stdouts', redirect_stdouts)
- self.redirect_stdouts_level = either(
- 'worker_redirect_stdouts_level', redirect_stdouts_level)
- self.max_interval = max_interval
- self.socket_timeout = socket_timeout
- self.no_color = no_color
- self.colored = app.log.colored(
- self.logfile,
- enabled=not no_color if no_color is not None else no_color,
- )
- self.pidfile = pidfile
- if not isinstance(self.loglevel, numbers.Integral):
- self.loglevel = LOG_LEVELS[self.loglevel.upper()]
- def run(self):
- print(str(self.colored.cyan(
- 'celery beat v{0} is starting.'.format(VERSION_BANNER))))
- self.init_loader()
- self.set_process_title()
- self.start_scheduler()
- def setup_logging(self, colorize=None):
- if colorize is None and self.no_color is not None:
- colorize = not self.no_color
- self.app.log.setup(self.loglevel, self.logfile,
- self.redirect_stdouts, self.redirect_stdouts_level,
- colorize=colorize)
- def start_scheduler(self):
- if self.pidfile:
- platforms.create_pidlock(self.pidfile)
- service = self.Service(
- app=self.app,
- max_interval=self.max_interval,
- scheduler_cls=self.scheduler_cls,
- schedule_filename=self.schedule,
- )
- print(self.banner(service))
- self.setup_logging()
- if self.socket_timeout:
- logger.debug('Setting default socket timeout to %r',
- self.socket_timeout)
- socket.setdefaulttimeout(self.socket_timeout)
- try:
- self.install_sync_handler(service)
- service.start()
- except Exception as exc:
- logger.critical('beat raised exception %s: %r',
- exc.__class__, exc,
- exc_info=True)
- raise
- def banner(self, service):
- c = self.colored
- return text_t( # flake8: noqa
- c.blue('__ ', c.magenta('-'),
- c.blue(' ... __ '), c.magenta('-'),
- c.blue(' _\n'),
- c.reset(self.startup_info(service))),
- )
- def init_loader(self):
- # Run the worker init handler.
- # (Usually imports task modules and such.)
- self.app.loader.init_worker()
- self.app.finalize()
- def startup_info(self, service):
- scheduler = service.get_scheduler(lazy=True)
- return STARTUP_INFO_FMT.format(
- conninfo=self.app.connection().as_uri(),
- timestamp=datetime.now().replace(microsecond=0),
- logfile=self.logfile or '[stderr]',
- loglevel=LOG_LEVELS[self.loglevel],
- loader=qualname(self.app.loader),
- scheduler=qualname(scheduler),
- scheduler_info=scheduler.info,
- hmax_interval=humanize_seconds(scheduler.max_interval),
- max_interval=scheduler.max_interval,
- )
- def set_process_title(self):
- arg_start = 'manage' in sys.argv[0] and 2 or 1
- platforms.set_process_title(
- 'celery beat', info=' '.join(sys.argv[arg_start:]),
- )
- def install_sync_handler(self, service):
- """Install a `SIGTERM` + `SIGINT` handler saving the schedule."""
- def _sync(signum, frame):
- service.sync()
- raise SystemExit()
- platforms.signals.update(SIGTERM=_sync, SIGINT=_sync)
|