123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- # -*- coding: utf-8 -*-
- """Logging configuration.
- The Celery instances logging section: ``Celery.log``.
- Sets up logging for the worker and other programs,
- redirects standard outs, colors log output, patches logging
- related compatibility fixes, and so on.
- """
- from __future__ import absolute_import, unicode_literals
- import logging
- import os
- import sys
- from logging.handlers import WatchedFileHandler
- from kombu.utils.encoding import set_default_encoding_file
- from celery import signals
- from celery._state import get_current_task
- from celery.five import string_t
- from celery.local import class_property
- from celery.platforms import isatty
- from celery.utils.log import (
- get_logger, mlevel,
- ColorFormatter, LoggingProxy, get_multiprocessing_logger,
- reset_multiprocessing_logger,
- )
- from celery.utils.nodenames import node_format
- from celery.utils.term import colored
- __all__ = ['TaskFormatter', 'Logging']
- MP_LOG = os.environ.get('MP_LOG', False)
- class TaskFormatter(ColorFormatter):
- """Formatter for tasks, adding the task name and id."""
- def format(self, record):
- task = get_current_task()
- if task and task.request:
- record.__dict__.update(task_id=task.request.id,
- task_name=task.name)
- else:
- record.__dict__.setdefault('task_name', '???')
- record.__dict__.setdefault('task_id', '???')
- return ColorFormatter.format(self, record)
- class Logging(object):
- """Application logging setup (app.log)."""
- #: The logging subsystem is only configured once per process.
- #: setup_logging_subsystem sets this flag, and subsequent calls
- #: will do nothing.
- _setup = False
- def __init__(self, app):
- self.app = app
- self.loglevel = mlevel(logging.WARN)
- self.format = self.app.conf.worker_log_format
- self.task_format = self.app.conf.worker_task_log_format
- self.colorize = self.app.conf.worker_log_color
- def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
- redirect_level='WARNING', colorize=None, hostname=None):
- loglevel = mlevel(loglevel)
- handled = self.setup_logging_subsystem(
- loglevel, logfile, colorize=colorize, hostname=hostname,
- )
- if not handled:
- if redirect_stdouts:
- self.redirect_stdouts(redirect_level)
- os.environ.update(
- CELERY_LOG_LEVEL=str(loglevel) if loglevel else '',
- CELERY_LOG_FILE=str(logfile) if logfile else '',
- )
- return handled
- def redirect_stdouts(self, loglevel=None, name='celery.redirected'):
- self.redirect_stdouts_to_logger(
- get_logger(name), loglevel=loglevel
- )
- os.environ.update(
- CELERY_LOG_REDIRECT='1',
- CELERY_LOG_REDIRECT_LEVEL=str(loglevel or ''),
- )
- def setup_logging_subsystem(self, loglevel=None, logfile=None, format=None,
- colorize=None, hostname=None, **kwargs):
- if self.already_setup:
- return
- if logfile and hostname:
- logfile = node_format(logfile, hostname)
- Logging._setup = True
- loglevel = mlevel(loglevel or self.loglevel)
- format = format or self.format
- colorize = self.supports_color(colorize, logfile)
- reset_multiprocessing_logger()
- receivers = signals.setup_logging.send(
- sender=None, loglevel=loglevel, logfile=logfile,
- format=format, colorize=colorize,
- )
- if not receivers:
- root = logging.getLogger()
- if self.app.conf.worker_hijack_root_logger:
- root.handlers = []
- get_logger('celery').handlers = []
- get_logger('celery.task').handlers = []
- get_logger('celery.redirected').handlers = []
- # Configure root logger
- self._configure_logger(
- root, logfile, loglevel, format, colorize, **kwargs
- )
- # Configure the multiprocessing logger
- self._configure_logger(
- get_multiprocessing_logger(),
- logfile, loglevel if MP_LOG else logging.ERROR,
- format, colorize, **kwargs
- )
- signals.after_setup_logger.send(
- sender=None, logger=root,
- loglevel=loglevel, logfile=logfile,
- format=format, colorize=colorize,
- )
- # then setup the root task logger.
- self.setup_task_loggers(loglevel, logfile, colorize=colorize)
- try:
- stream = logging.getLogger().handlers[0].stream
- except (AttributeError, IndexError):
- pass
- else:
- set_default_encoding_file(stream)
- # This is a hack for multiprocessing's fork+exec, so that
- # logging before Process.run works.
- logfile_name = logfile if isinstance(logfile, string_t) else ''
- os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
- _MP_FORK_LOGFILE_=logfile_name,
- _MP_FORK_LOGFORMAT_=format)
- return receivers
- def _configure_logger(self, logger, logfile, loglevel,
- format, colorize, **kwargs):
- if logger is not None:
- self.setup_handlers(logger, logfile, format,
- colorize, **kwargs)
- if loglevel:
- logger.setLevel(loglevel)
- def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
- colorize=None, propagate=False, **kwargs):
- """Setup the task logger.
- If `logfile` is not specified, then `sys.stderr` is used.
- Will return the base task logger object.
- """
- loglevel = mlevel(loglevel or self.loglevel)
- format = format or self.task_format
- colorize = self.supports_color(colorize, logfile)
- logger = self.setup_handlers(
- get_logger('celery.task'),
- logfile, format, colorize,
- formatter=TaskFormatter, **kwargs
- )
- logger.setLevel(loglevel)
- # this is an int for some reason, better to not question why.
- logger.propagate = int(propagate)
- signals.after_setup_task_logger.send(
- sender=None, logger=logger,
- loglevel=loglevel, logfile=logfile,
- format=format, colorize=colorize,
- )
- return logger
- def redirect_stdouts_to_logger(self, logger, loglevel=None,
- stdout=True, stderr=True):
- """Redirect :class:`sys.stdout` and :class:`sys.stderr` to logger.
- Arguments:
- logger (logging.Logger): Logger instance to redirect to.
- loglevel (int, str): The loglevel redirected message
- will be logged as.
- """
- proxy = LoggingProxy(logger, loglevel)
- if stdout:
- sys.stdout = proxy
- if stderr:
- sys.stderr = proxy
- return proxy
- def supports_color(self, colorize=None, logfile=None):
- colorize = self.colorize if colorize is None else colorize
- if self.app.IS_WINDOWS:
- # Windows does not support ANSI color codes.
- return False
- if colorize or colorize is None:
- # Only use color if there's no active log file
- # and stderr is an actual terminal.
- return logfile is None and isatty(sys.stderr)
- return colorize
- def colored(self, logfile=None, enabled=None):
- return colored(enabled=self.supports_color(enabled, logfile))
- def setup_handlers(self, logger, logfile, format, colorize,
- formatter=ColorFormatter, **kwargs):
- if self._is_configured(logger):
- return logger
- handler = self._detect_handler(logfile)
- handler.setFormatter(formatter(format, use_color=colorize))
- logger.addHandler(handler)
- return logger
- def _detect_handler(self, logfile=None):
- """Create handler from filename, an open stream or `None` (stderr)."""
- logfile = sys.__stderr__ if logfile is None else logfile
- if hasattr(logfile, 'write'):
- return logging.StreamHandler(logfile)
- return WatchedFileHandler(logfile)
- def _has_handler(self, logger):
- return any(
- not isinstance(h, logging.NullHandler)
- for h in logger.handlers or []
- )
- def _is_configured(self, logger):
- return self._has_handler(logger) and not getattr(
- logger, '_rudimentary_setup', False)
- def setup_logger(self, name='celery', *args, **kwargs):
- """Deprecated: No longer used."""
- self.setup_logging_subsystem(*args, **kwargs)
- return logging.root
- def get_default_logger(self, name='celery', **kwargs):
- return get_logger(name)
- @class_property
- def already_setup(self):
- return self._setup
- @already_setup.setter # noqa
- def already_setup(self, was_setup):
- self._setup = was_setup
|