log.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import logging
  4. import threading
  5. import sys
  6. import traceback
  7. try:
  8. from multiprocessing import current_process
  9. from multiprocessing import util as mputil
  10. except ImportError:
  11. current_process = mputil = None # noqa
  12. from . import current_app
  13. from . import signals
  14. from .local import Proxy
  15. from .utils import LOG_LEVELS, isatty
  16. from .utils.compat import LoggerAdapter, WatchedFileHandler
  17. from .utils.encoding import safe_str, str_t
  18. from .utils.patch import ensure_process_aware_logger
  19. from .utils.term import colored
  20. is_py3k = sys.version_info >= (3, 0)
  21. def mlevel(level):
  22. if level and not isinstance(level, int):
  23. return LOG_LEVELS[level.upper()]
  24. return level
  25. class ColorFormatter(logging.Formatter):
  26. #: Loglevel -> Color mapping.
  27. COLORS = colored().names
  28. colors = {"DEBUG": COLORS["blue"], "WARNING": COLORS["yellow"],
  29. "ERROR": COLORS["red"], "CRITICAL": COLORS["magenta"]}
  30. def __init__(self, msg, use_color=True):
  31. logging.Formatter.__init__(self, msg)
  32. self.use_color = use_color
  33. def formatException(self, ei):
  34. r = logging.Formatter.formatException(self, ei)
  35. if isinstance(r, str) and not is_py3k:
  36. return safe_str(r)
  37. return r
  38. def format(self, record):
  39. levelname = record.levelname
  40. color = self.colors.get(levelname)
  41. if self.use_color and color:
  42. try:
  43. record.msg = safe_str(str_t(color(record.msg)))
  44. except Exception, exc:
  45. record.msg = "<Unrepresentable %r: %r>" % (
  46. type(record.msg), exc)
  47. record.exc_info = sys.exc_info()
  48. if not is_py3k:
  49. # Very ugly, but have to make sure processName is supported
  50. # by foreign logger instances.
  51. # (processName is always supported by Python 2.7)
  52. if "processName" not in record.__dict__:
  53. process_name = (current_process and
  54. current_process()._name or "")
  55. record.__dict__["processName"] = process_name
  56. return safe_str(logging.Formatter.format(self, record))
  57. class Logging(object):
  58. #: The logging subsystem is only configured once per process.
  59. #: setup_logging_subsystem sets this flag, and subsequent calls
  60. #: will do nothing.
  61. _setup = False
  62. def __init__(self, app):
  63. self.app = app
  64. self.loglevel = mlevel(self.app.conf.CELERYD_LOG_LEVEL)
  65. self.format = self.app.conf.CELERYD_LOG_FORMAT
  66. self.task_format = self.app.conf.CELERYD_TASK_LOG_FORMAT
  67. self.colorize = self.app.conf.CELERYD_LOG_COLOR
  68. def supports_color(self, logfile=None):
  69. if self.app.IS_WINDOWS:
  70. # Windows does not support ANSI color codes.
  71. return False
  72. if self.colorize is None:
  73. # Only use color if there is no active log file
  74. # and stderr is an actual terminal.
  75. return logfile is None and isatty(sys.stderr)
  76. return self.colorize
  77. def colored(self, logfile=None):
  78. return colored(enabled=self.supports_color(logfile))
  79. def get_task_logger(self, loglevel=None, name=None):
  80. logger = logging.getLogger(name or "celery.task.default")
  81. if loglevel is not None:
  82. logger.setLevel(mlevel(loglevel))
  83. return logger
  84. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  85. format=None, colorize=None, **kwargs):
  86. if Logging._setup:
  87. return
  88. loglevel = mlevel(loglevel or self.loglevel)
  89. format = format or self.format
  90. if colorize is None:
  91. colorize = self.supports_color(logfile)
  92. if mputil and hasattr(mputil, "_logger"):
  93. mputil._logger = None
  94. if not is_py3k:
  95. ensure_process_aware_logger()
  96. receivers = signals.setup_logging.send(sender=None,
  97. loglevel=loglevel, logfile=logfile,
  98. format=format, colorize=colorize)
  99. if not receivers:
  100. root = logging.getLogger()
  101. if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  102. root.handlers = []
  103. mp = mputil.get_logger() if mputil else None
  104. for logger in filter(None, (root, mp)):
  105. self._setup_logger(logger, logfile, format, colorize, **kwargs)
  106. logger.setLevel(mlevel(loglevel))
  107. signals.after_setup_logger.send(sender=None, logger=logger,
  108. loglevel=loglevel, logfile=logfile,
  109. format=format, colorize=colorize)
  110. Logging._setup = True
  111. return receivers
  112. def _detect_handler(self, logfile=None):
  113. """Create log handler with either a filename, an open stream
  114. or :const:`None` (stderr)."""
  115. logfile = sys.__stderr__ if logfile is None else logfile
  116. if hasattr(logfile, "write"):
  117. return logging.StreamHandler(logfile)
  118. return WatchedFileHandler(logfile)
  119. def get_default_logger(self, loglevel=None, name="celery"):
  120. """Get default logger instance.
  121. :keyword loglevel: Initial log level.
  122. """
  123. logger = logging.getLogger(name)
  124. if loglevel is not None:
  125. logger.setLevel(mlevel(loglevel))
  126. return logger
  127. def setup_logger(self, loglevel=None, logfile=None,
  128. format=None, colorize=None, name="celery", root=True,
  129. app=None, **kwargs):
  130. """Setup the :mod:`multiprocessing` logger.
  131. If `logfile` is not specified, then `sys.stderr` is used.
  132. Returns logger object.
  133. """
  134. loglevel = mlevel(loglevel or self.loglevel)
  135. format = format or self.format
  136. if colorize is None:
  137. colorize = self.supports_color(logfile)
  138. if not root or self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  139. return self._setup_logger(self.get_default_logger(loglevel, name),
  140. logfile, format, colorize, **kwargs)
  141. self.setup_logging_subsystem(loglevel, logfile,
  142. format, colorize, **kwargs)
  143. return self.get_default_logger(name=name)
  144. def setup_task_logger(self, loglevel=None, logfile=None, format=None,
  145. colorize=None, task_name=None, task_id=None, propagate=False,
  146. app=None, **kwargs):
  147. """Setup the task logger.
  148. If `logfile` is not specified, then `sys.stderr` is used.
  149. Returns logger object.
  150. """
  151. loglevel = mlevel(loglevel or self.loglevel)
  152. format = format or self.task_format
  153. if colorize is None:
  154. colorize = self.supports_color(logfile)
  155. logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
  156. logfile, format, colorize, **kwargs)
  157. logger.propagate = int(propagate) # this is an int for some reason.
  158. # better to not question why.
  159. signals.after_setup_task_logger.send(sender=None, logger=logger,
  160. loglevel=loglevel, logfile=logfile,
  161. format=format, colorize=colorize)
  162. return LoggerAdapter(logger, {"task_id": task_id,
  163. "task_name": task_name})
  164. def redirect_stdouts_to_logger(self, logger, loglevel=None,
  165. stdout=True, stderr=True):
  166. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  167. logging instance.
  168. :param logger: The :class:`logging.Logger` instance to redirect to.
  169. :param loglevel: The loglevel redirected messages will be logged as.
  170. """
  171. proxy = LoggingProxy(logger, loglevel)
  172. if stdout:
  173. sys.stdout = proxy
  174. if stderr:
  175. sys.stderr = proxy
  176. return proxy
  177. def _setup_logger(self, logger, logfile, format, colorize,
  178. formatter=ColorFormatter, **kwargs):
  179. if logger.handlers: # Logger already configured
  180. return logger
  181. handler = self._detect_handler(logfile)
  182. handler.setFormatter(formatter(format, use_color=colorize))
  183. logger.addHandler(handler)
  184. return logger
  185. get_default_logger = Proxy(lambda: current_app.log.get_default_logger)
  186. setup_logger = Proxy(lambda: current_app.log.setup_logger)
  187. setup_task_logger = Proxy(lambda: current_app.log.setup_task_logger)
  188. get_task_logger = Proxy(lambda: current_app.log.get_task_logger)
  189. setup_logging_subsystem = Proxy(
  190. lambda: current_app.log.setup_logging_subsystem)
  191. redirect_stdouts_to_logger = Proxy(
  192. lambda: current_app.log.redirect_stdouts_to_logger)
  193. class LoggingProxy(object):
  194. """Forward file object to :class:`logging.Logger` instance.
  195. :param logger: The :class:`logging.Logger` instance to forward to.
  196. :param loglevel: Loglevel to use when writing messages.
  197. """
  198. mode = "w"
  199. name = None
  200. closed = False
  201. loglevel = logging.ERROR
  202. _thread = threading.local()
  203. def __init__(self, logger, loglevel=None):
  204. self.logger = logger
  205. self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
  206. self._safewrap_handlers()
  207. def _safewrap_handlers(self):
  208. """Make the logger handlers dump internal errors to
  209. `sys.__stderr__` instead of `sys.stderr` to circumvent
  210. infinite loops."""
  211. def wrap_handler(handler): # pragma: no cover
  212. class WithSafeHandleError(logging.Handler):
  213. def handleError(self, record):
  214. exc_info = sys.exc_info()
  215. try:
  216. try:
  217. traceback.print_exception(exc_info[0],
  218. exc_info[1],
  219. exc_info[2],
  220. None, sys.__stderr__)
  221. except IOError:
  222. pass # see python issue 5971
  223. finally:
  224. del(exc_info)
  225. handler.handleError = WithSafeHandleError().handleError
  226. return map(wrap_handler, self.logger.handlers)
  227. def write(self, data):
  228. if getattr(self._thread, "recurse_protection", False):
  229. # Logger is logging back to this file, so stop recursing.
  230. return
  231. """Write message to logging object."""
  232. data = data.strip()
  233. if data and not self.closed:
  234. self._thread.recurse_protection = True
  235. try:
  236. self.logger.log(self.loglevel, safe_str(data))
  237. finally:
  238. self._thread.recurse_protection = False
  239. def writelines(self, sequence):
  240. """`writelines(sequence_of_strings) -> None`.
  241. Write the strings to the file.
  242. The sequence can be any iterable object producing strings.
  243. This is equivalent to calling :meth:`write` for each string.
  244. """
  245. for part in sequence:
  246. self.write(part)
  247. def flush(self):
  248. """This object is not buffered so any :meth:`flush` requests
  249. are ignored."""
  250. pass
  251. def close(self):
  252. """When the object is closed, no write requests are forwarded to
  253. the logging object anymore."""
  254. self.closed = True
  255. def isatty(self):
  256. """Always returns :const:`False`. Just here for file support."""
  257. return False
  258. def fileno(self):
  259. return None
  260. class SilenceRepeated(object):
  261. """Only log action every n iterations."""
  262. def __init__(self, action, max_iterations=10):
  263. self.action = action
  264. self.max_iterations = max_iterations
  265. self._iterations = 0
  266. def __call__(self, *msgs):
  267. if not self._iterations or self._iterations >= self.max_iterations:
  268. for msg in msgs:
  269. self.action(msg)
  270. self._iterations = 0
  271. else:
  272. self._iterations += 1