123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- """celery.log"""
- import logging
- import threading
- import sys
- import traceback
- from multiprocessing import current_process
- from multiprocessing import util as mputil
- from celery import signals
- from celery import current_app
- from celery.utils import LOG_LEVELS, isatty
- from celery.utils.compat import LoggerAdapter
- from celery.utils.patch import ensure_process_aware_logger
- from celery.utils.term import colored
- class ColorFormatter(logging.Formatter):
- #: Loglevel -> Color mapping.
- COLORS = colored().names
- colors = {"DEBUG": COLORS["blue"], "WARNING": COLORS["yellow"],
- "ERROR": COLORS["red"], "CRITICAL": COLORS["magenta"]}
- def __init__(self, msg, use_color=True):
- logging.Formatter.__init__(self, msg)
- self.use_color = use_color
- def formatException(self, ei):
- r = logging.Formatter.formatException(self, ei)
- if isinstance(r, str):
- return r.decode("utf-8", "replace") # Convert to unicode
- return r
- def format(self, record):
- levelname = record.levelname
- color = self.colors.get(levelname)
- if self.use_color and color:
- record.msg = unicode(color(record.msg))
- # Very ugly, but have to make sure processName is supported
- # by foreign logger instances.
- # (processName is always supported by Python 2.7)
- if "processName" not in record.__dict__:
- record.__dict__["processName"] = current_process()._name
- t = logging.Formatter.format(self, record)
- if isinstance(t, unicode):
- return t.encode("utf-8", "replace")
- return t
- class Logging(object):
- #: The logging subsystem is only configured once per process.
- #: setup_logging_subsystem sets this flag, and subsequent calls
- #: will do nothing.
- _setup = False
- def __init__(self, app):
- self.app = app
- self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
- self.format = self.app.conf.CELERYD_LOG_FORMAT
- self.colorize = self.app.conf.CELERYD_LOG_COLOR
- def supports_color(self, logfile=None):
- if self.app.IS_WINDOWS:
- # Windows does not support ANSI color codes.
- return False
- if self.colorize is None:
- # Only use color if there is no active log file
- # and stderr is an actual terminal.
- return logfile is None and isatty(sys.stderr)
- return self.colorize
- def colored(self, logfile=None):
- return colored(enabled=self.supports_color(logfile))
- def get_task_logger(self, loglevel=None, name=None):
- logger = logging.getLogger(name or "celery.task.default")
- if loglevel is not None:
- logger.setLevel(loglevel)
- return logger
- def setup_logging_subsystem(self, loglevel=None, logfile=None,
- format=None, colorize=None, **kwargs):
- if Logging._setup:
- return
- loglevel = loglevel or self.loglevel
- format = format or self.format
- if colorize is None:
- colorize = self.supports_color(logfile)
- try:
- mputil._logger = None
- except AttributeError:
- pass
- ensure_process_aware_logger()
- logging.Logger.manager.loggerDict.clear()
- receivers = signals.setup_logging.send(sender=None,
- loglevel=loglevel,
- logfile=logfile,
- format=format,
- colorize=colorize)
- if not receivers:
- root = logging.getLogger()
- if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
- root.handlers = []
- mp = mputil.get_logger()
- for logger in (root, mp):
- self._setup_logger(logger, logfile, format, colorize, **kwargs)
- logger.setLevel(loglevel)
- Logging._setup = True
- return receivers
- def _detect_handler(self, logfile=None):
- """Create log handler with either a filename, an open stream
- or :const:`None` (stderr)."""
- if logfile is None:
- logfile = sys.__stderr__
- if hasattr(logfile, "write"):
- return logging.StreamHandler(logfile)
- return logging.FileHandler(logfile)
- def get_default_logger(self, loglevel=None, name="celery"):
- """Get default logger instance.
- :keyword loglevel: Initial log level.
- """
- logger = logging.getLogger(name)
- if loglevel is not None:
- logger.setLevel(loglevel)
- return logger
- def setup_logger(self, loglevel=None, logfile=None,
- format=None, colorize=None, name="celery", root=True,
- app=None, **kwargs):
- """Setup the :mod:`multiprocessing` logger.
- If `logfile` is not specified, then `sys.stderr` is used.
- Returns logger object.
- """
- loglevel = loglevel or self.loglevel
- format = format or self.format
- if colorize is None:
- colorize = self.supports_color(logfile)
- if not root or self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
- return self._setup_logger(self.get_default_logger(loglevel, name),
- logfile, format, colorize, **kwargs)
- self.setup_logging_subsystem(loglevel, logfile,
- format, colorize, **kwargs)
- return self.get_default_logger(name=name)
- def setup_task_logger(self, loglevel=None, logfile=None, format=None,
- colorize=None, task_kwargs=None, propagate=False, app=None,
- **kwargs):
- """Setup the task logger.
- If `logfile` is not specified, then `sys.stderr` is used.
- Returns logger object.
- """
- loglevel = loglevel or self.loglevel
- format = format or self.format
- if colorize is None:
- colorize = self.supports_color(logfile)
- if task_kwargs is None:
- task_kwargs = {}
- task_kwargs.setdefault("task_id", "-?-")
- task_name = task_kwargs.get("task_name")
- task_kwargs.setdefault("task_name", "-?-")
- logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
- logfile, format, colorize, **kwargs)
- logger.propagate = int(propagate) # this is an int for some reason.
- # better to not question why.
- return LoggerAdapter(logger, task_kwargs)
- def redirect_stdouts_to_logger(self, logger, loglevel=None):
- """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
- logging instance.
- :param logger: The :class:`logging.Logger` instance to redirect to.
- :param loglevel: The loglevel redirected messages will be logged as.
- """
- proxy = LoggingProxy(logger, loglevel)
- sys.stdout = sys.stderr = proxy
- return proxy
- def _setup_logger(self, logger, logfile, format, colorize,
- formatter=ColorFormatter, **kwargs):
- if logger.handlers: # Logger already configured
- return logger
- handler = self._detect_handler(logfile)
- handler.setFormatter(formatter(format, use_color=colorize))
- logger.addHandler(handler)
- return logger
- setup_logging_subsystem = current_app.log.setup_logging_subsystem
- get_default_logger = current_app.log.get_default_logger
- setup_logger = current_app.log.setup_logger
- setup_task_logger = current_app.log.setup_task_logger
- get_task_logger = current_app.log.get_task_logger
- redirect_stdouts_to_logger = current_app.log.redirect_stdouts_to_logger
- class LoggingProxy(object):
- """Forward file object to :class:`logging.Logger` instance.
- :param logger: The :class:`logging.Logger` instance to forward to.
- :param loglevel: Loglevel to use when writing messages.
- """
- mode = "w"
- name = None
- closed = False
- loglevel = logging.ERROR
- _thread = threading.local()
- def __init__(self, logger, loglevel=None):
- self.logger = logger
- self.loglevel = loglevel or self.logger.level or self.loglevel
- if not isinstance(self.loglevel, int):
- self.loglevel = LOG_LEVELS[self.loglevel.upper()]
- self._safewrap_handlers()
- def _safewrap_handlers(self):
- """Make the logger handlers dump internal errors to
- `sys.__stderr__` instead of `sys.stderr` to circumvent
- infinite loops."""
- def wrap_handler(handler): # pragma: no cover
- class WithSafeHandleError(logging.Handler):
- def handleError(self, record):
- exc_info = sys.exc_info()
- try:
- try:
- traceback.print_exception(exc_info[0],
- exc_info[1],
- exc_info[2],
- None, sys.__stderr__)
- except IOError:
- pass # see python issue 5971
- finally:
- del(exc_info)
- handler.handleError = WithSafeHandleError().handleError
- return map(wrap_handler, self.logger.handlers)
- def write(self, data):
- if getattr(self._thread, "recurse_protection", False):
- # Logger is logging back to this file, so stop recursing.
- return
- """Write message to logging object."""
- data = data.strip()
- if data and not self.closed:
- self._thread.recurse_protection = True
- try:
- self.logger.log(self.loglevel, data)
- finally:
- self._thread.recurse_protection = False
- def writelines(self, sequence):
- """`writelines(sequence_of_strings) -> None`.
- Write the strings to the file.
- The sequence can be any iterable object producing strings.
- This is equivalent to calling :meth:`write` for each string.
- """
- for part in sequence:
- self.write(part)
- def flush(self):
- """This object is not buffered so any :meth:`flush` requests
- are ignored."""
- pass
- def close(self):
- """When the object is closed, no write requests are forwarded to
- the logging object anymore."""
- self.closed = True
- def isatty(self):
- """Always returns :const:`False`. Just here for file support."""
- return False
- def fileno(self):
- return None
- class SilenceRepeated(object):
- """Only log action every n iterations."""
- def __init__(self, action, max_iterations=10):
- self.action = action
- self.max_iterations = max_iterations
- self._iterations = 0
- def __call__(self, *msgs):
- if self._iterations >= self.max_iterations:
- for msg in msgs:
- self.action(msg)
- self._iterations = 0
- else:
- self._iterations += 1
|