|
@@ -7,13 +7,18 @@ from celery import __version__
|
|
|
from celery import beat
|
|
|
from celery import platforms
|
|
|
from celery.app import app_or_default
|
|
|
-from celery.utils import LOG_LEVELS
|
|
|
+from celery.utils import get_full_cls_name, LOG_LEVELS
|
|
|
+from celery.utils import term
|
|
|
+from celery.utils.timeutils import humanize_seconds
|
|
|
|
|
|
STARTUP_INFO_FMT = """
|
|
|
Configuration ->
|
|
|
. broker -> %(conninfo)s
|
|
|
- . schedule -> %(schedule)s
|
|
|
+ . loader -> %(loader)s
|
|
|
+ . scheduler -> %(scheduler)s
|
|
|
+%(scheduler_info)s
|
|
|
. logfile -> %(logfile)s@%(loglevel)s
|
|
|
+ . maxinterval -> %(hmax_interval)s (%(max_interval)ss)
|
|
|
""".strip()
|
|
|
|
|
|
|
|
@@ -32,21 +37,17 @@ class Beat(object):
|
|
|
self.scheduler_cls = scheduler_cls or app.conf.CELERYBEAT_SCHEDULER
|
|
|
self.max_interval = max_interval
|
|
|
self.socket_timeout = socket_timeout
|
|
|
+ self.colored = term.colored(enabled=defaults.CELERYD_LOG_COLOR)
|
|
|
|
|
|
if not isinstance(self.loglevel, int):
|
|
|
self.loglevel = LOG_LEVELS[self.loglevel.upper()]
|
|
|
|
|
|
def run(self):
|
|
|
logger = self.setup_logging()
|
|
|
- print("celerybeat v%s is starting." % __version__)
|
|
|
+ print(str(self.colored.magenta(
|
|
|
+ "celerybeat v%s is starting." % __version__)))
|
|
|
self.init_loader()
|
|
|
- print(self.startup_info())
|
|
|
self.set_process_title()
|
|
|
- if self.socket_timeout:
|
|
|
- logger.debug("Setting default socket timeout to %r" % (
|
|
|
- self.socket_timeout))
|
|
|
- socket.setdefaulttimeout(self.socket_timeout)
|
|
|
- print("celerybeat has started.")
|
|
|
self.start_scheduler(logger)
|
|
|
|
|
|
def setup_logging(self):
|
|
@@ -59,12 +60,21 @@ class Beat(object):
|
|
|
return logger
|
|
|
|
|
|
def start_scheduler(self, logger=None):
|
|
|
+ c = self.colored
|
|
|
beat = self.Service(app=self.app,
|
|
|
logger=logger,
|
|
|
max_interval=self.max_interval,
|
|
|
scheduler_cls=self.scheduler_cls,
|
|
|
schedule_filename=self.schedule)
|
|
|
|
|
|
+ print(str(c.blue("__ ", c.red("-"),
|
|
|
+ c.blue(" ... __ "), c.red("-"),
|
|
|
+ c.blue(" _\n"),
|
|
|
+ c.reset(self.startup_info(beat)))))
|
|
|
+ if self.socket_timeout:
|
|
|
+ logger.debug("Setting default socket timeout to %r" % (
|
|
|
+ self.socket_timeout))
|
|
|
+ socket.setdefaulttimeout(self.socket_timeout)
|
|
|
try:
|
|
|
self.install_sync_handler(beat)
|
|
|
beat.start()
|
|
@@ -77,12 +87,16 @@ class Beat(object):
|
|
|
# (Usually imports task modules and such.)
|
|
|
self.app.loader.init_worker()
|
|
|
|
|
|
- def startup_info(self):
|
|
|
+ def startup_info(self, beat):
|
|
|
return STARTUP_INFO_FMT % {
|
|
|
"conninfo": self.app.amqp.format_broker_info(),
|
|
|
- "logfile": self.logfile or "@stderr",
|
|
|
+ "logfile": self.logfile or "[stderr]",
|
|
|
"loglevel": LOG_LEVELS[self.loglevel],
|
|
|
- "schedule": self.schedule,
|
|
|
+ "loader": get_full_cls_name(self.app.loader.__class__),
|
|
|
+ "scheduler": get_full_cls_name(beat.scheduler.__class__),
|
|
|
+ "scheduler_info": beat.scheduler.info,
|
|
|
+ "hmax_interval": humanize_seconds(beat.max_interval),
|
|
|
+ "max_interval": beat.max_interval,
|
|
|
}
|
|
|
|
|
|
def set_process_title(self):
|