log.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import
  3. import logging
  4. import threading
  5. import os
  6. import sys
  7. import traceback
  8. try:
  9. from multiprocessing import current_process
  10. from multiprocessing import util as mputil
  11. except ImportError:
  12. current_process = mputil = None # noqa
  13. from . import current_app
  14. from . import signals
  15. from .local import Proxy
  16. from .utils import LOG_LEVELS, isatty
  17. from .utils.compat import LoggerAdapter, WatchedFileHandler
  18. from .utils.encoding import safe_str, str_t
  19. from .utils.patch import ensure_process_aware_logger
  20. from .utils.term import colored
  21. is_py3k = sys.version_info >= (3, 0)
  22. def mlevel(level):
  23. if level and not isinstance(level, int):
  24. return LOG_LEVELS[level.upper()]
  25. return level
  26. class ColorFormatter(logging.Formatter):
  27. #: Loglevel -> Color mapping.
  28. COLORS = colored().names
  29. colors = {"DEBUG": COLORS["blue"], "WARNING": COLORS["yellow"],
  30. "ERROR": COLORS["red"], "CRITICAL": COLORS["magenta"]}
  31. def __init__(self, msg, use_color=True):
  32. logging.Formatter.__init__(self, msg)
  33. self.use_color = use_color
  34. def formatException(self, ei):
  35. r = logging.Formatter.formatException(self, ei)
  36. if isinstance(r, str) and not is_py3k:
  37. return safe_str(r)
  38. return r
  39. def format(self, record):
  40. levelname = record.levelname
  41. color = self.colors.get(levelname)
  42. if self.use_color and color:
  43. try:
  44. record.msg = safe_str(str_t(color(record.msg)))
  45. except Exception, exc:
  46. record.msg = "<Unrepresentable %r: %r>" % (
  47. type(record.msg), exc)
  48. record.exc_info = sys.exc_info()
  49. if not is_py3k:
  50. # Very ugly, but have to make sure processName is supported
  51. # by foreign logger instances.
  52. # (processName is always supported by Python 2.7)
  53. if "processName" not in record.__dict__:
  54. process_name = (current_process and
  55. current_process()._name or "")
  56. record.__dict__["processName"] = process_name
  57. return safe_str(logging.Formatter.format(self, record))
  58. class Logging(object):
  59. #: The logging subsystem is only configured once per process.
  60. #: setup_logging_subsystem sets this flag, and subsequent calls
  61. #: will do nothing.
  62. _setup = False
  63. def __init__(self, app):
  64. self.app = app
  65. self.loglevel = mlevel(self.app.conf.CELERYD_LOG_LEVEL)
  66. self.format = self.app.conf.CELERYD_LOG_FORMAT
  67. self.task_format = self.app.conf.CELERYD_TASK_LOG_FORMAT
  68. self.colorize = self.app.conf.CELERYD_LOG_COLOR
  69. def supports_color(self, logfile=None):
  70. if self.app.IS_WINDOWS:
  71. # Windows does not support ANSI color codes.
  72. return False
  73. if self.colorize is None:
  74. # Only use color if there is no active log file
  75. # and stderr is an actual terminal.
  76. return logfile is None and isatty(sys.stderr)
  77. return self.colorize
  78. def colored(self, logfile=None):
  79. return colored(enabled=self.supports_color(logfile))
  80. def get_task_logger(self, loglevel=None, name=None):
  81. logger = logging.getLogger(name or "celery.task.default")
  82. if loglevel is not None:
  83. logger.setLevel(mlevel(loglevel))
  84. return logger
  85. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  86. format=None, colorize=None, **kwargs):
  87. if Logging._setup:
  88. return
  89. loglevel = mlevel(loglevel or self.loglevel)
  90. format = format or self.format
  91. if colorize is None:
  92. colorize = self.supports_color(logfile)
  93. if mputil and hasattr(mputil, "_logger"):
  94. mputil._logger = None
  95. if not is_py3k:
  96. ensure_process_aware_logger()
  97. receivers = signals.setup_logging.send(sender=None,
  98. loglevel=loglevel, logfile=logfile,
  99. format=format, colorize=colorize)
  100. if not receivers:
  101. root = logging.getLogger()
  102. if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  103. root.handlers = []
  104. mp = mputil.get_logger() if mputil else None
  105. for logger in filter(None, (root, mp)):
  106. self._setup_logger(logger, logfile, format, colorize, **kwargs)
  107. logger.setLevel(mlevel(loglevel))
  108. signals.after_setup_logger.send(sender=None, logger=logger,
  109. loglevel=loglevel, logfile=logfile,
  110. format=format, colorize=colorize)
  111. # This is a hack for multiprocessing's fork+exec, so that
  112. # logging before Process.run works.
  113. os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
  114. _MP_FORK_LOGFILE_=logfile or "",
  115. _MP_FORK_LOGFORMAT_=format)
  116. Logging._setup = True
  117. return receivers
  118. def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
  119. redirect_level="WARNING"):
  120. handled = self.setup_logging_subsystem(loglevel=loglevel,
  121. logfile=logfile)
  122. if not handled:
  123. logger = self.get_default_logger()
  124. if redirect_stdouts:
  125. self.redirect_stdouts_to_logger(logger,
  126. loglevel=redirect_level)
  127. os.environ.update(
  128. CELERY_LOG_LEVEL=str(loglevel) if loglevel else "",
  129. CELERY_LOG_FILE=str(logfile) if logfile else "",
  130. CELERY_LOG_REDIRECT="1" if redirect_stdouts else "",
  131. CELERY_LOG_REDIRECT_LEVEL=str(redirect_level))
  132. def _detect_handler(self, logfile=None):
  133. """Create log handler with either a filename, an open stream
  134. or :const:`None` (stderr)."""
  135. logfile = sys.__stderr__ if logfile is None else logfile
  136. if hasattr(logfile, "write"):
  137. return logging.StreamHandler(logfile)
  138. return WatchedFileHandler(logfile)
  139. def get_default_logger(self, loglevel=None, name="celery"):
  140. """Get default logger instance.
  141. :keyword loglevel: Initial log level.
  142. """
  143. logger = logging.getLogger(name)
  144. if loglevel is not None:
  145. logger.setLevel(mlevel(loglevel))
  146. return logger
  147. def setup_logger(self, loglevel=None, logfile=None,
  148. format=None, colorize=None, name="celery", root=True,
  149. app=None, **kwargs):
  150. """Setup the :mod:`multiprocessing` logger.
  151. If `logfile` is not specified, then `sys.stderr` is used.
  152. Returns logger object.
  153. """
  154. loglevel = mlevel(loglevel or self.loglevel)
  155. format = format or self.format
  156. if colorize is None:
  157. colorize = self.supports_color(logfile)
  158. if not root or self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  159. return self._setup_logger(self.get_default_logger(loglevel, name),
  160. logfile, format, colorize, **kwargs)
  161. self.setup_logging_subsystem(loglevel, logfile,
  162. format, colorize, **kwargs)
  163. return self.get_default_logger(name=name)
  164. def setup_task_logger(self, loglevel=None, logfile=None, format=None,
  165. colorize=None, task_name=None, task_id=None, propagate=False,
  166. app=None, **kwargs):
  167. """Setup the task logger.
  168. If `logfile` is not specified, then `sys.stderr` is used.
  169. Returns logger object.
  170. """
  171. loglevel = mlevel(loglevel or self.loglevel)
  172. format = format or self.task_format
  173. if colorize is None:
  174. colorize = self.supports_color(logfile)
  175. logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
  176. logfile, format, colorize, **kwargs)
  177. logger.propagate = int(propagate) # this is an int for some reason.
  178. # better to not question why.
  179. signals.after_setup_task_logger.send(sender=None, logger=logger,
  180. loglevel=loglevel, logfile=logfile,
  181. format=format, colorize=colorize)
  182. return LoggerAdapter(logger, {"task_id": task_id,
  183. "task_name": task_name})
  184. def redirect_stdouts_to_logger(self, logger, loglevel=None,
  185. stdout=True, stderr=True):
  186. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  187. logging instance.
  188. :param logger: The :class:`logging.Logger` instance to redirect to.
  189. :param loglevel: The loglevel redirected messages will be logged as.
  190. """
  191. proxy = LoggingProxy(logger, loglevel)
  192. if stdout:
  193. sys.stdout = proxy
  194. if stderr:
  195. sys.stderr = proxy
  196. return proxy
  197. def _is_configured(self, logger):
  198. return logger.handlers and not getattr(
  199. logger, "_rudimentary_setup", False)
  200. def _setup_logger(self, logger, logfile, format, colorize,
  201. formatter=ColorFormatter, **kwargs):
  202. if self._is_configured(logger):
  203. return logger
  204. handler = self._detect_handler(logfile)
  205. handler.setFormatter(formatter(format, use_color=colorize))
  206. logger.addHandler(handler)
  207. return logger
  208. get_default_logger = Proxy(lambda: current_app.log.get_default_logger)
  209. setup_logger = Proxy(lambda: current_app.log.setup_logger)
  210. setup_task_logger = Proxy(lambda: current_app.log.setup_task_logger)
  211. get_task_logger = Proxy(lambda: current_app.log.get_task_logger)
  212. setup_logging_subsystem = Proxy(
  213. lambda: current_app.log.setup_logging_subsystem)
  214. redirect_stdouts_to_logger = Proxy(
  215. lambda: current_app.log.redirect_stdouts_to_logger)
  216. class LoggingProxy(object):
  217. """Forward file object to :class:`logging.Logger` instance.
  218. :param logger: The :class:`logging.Logger` instance to forward to.
  219. :param loglevel: Loglevel to use when writing messages.
  220. """
  221. mode = "w"
  222. name = None
  223. closed = False
  224. loglevel = logging.ERROR
  225. _thread = threading.local()
  226. def __init__(self, logger, loglevel=None):
  227. self.logger = logger
  228. self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
  229. self._safewrap_handlers()
  230. def _safewrap_handlers(self):
  231. """Make the logger handlers dump internal errors to
  232. `sys.__stderr__` instead of `sys.stderr` to circumvent
  233. infinite loops."""
  234. def wrap_handler(handler): # pragma: no cover
  235. class WithSafeHandleError(logging.Handler):
  236. def handleError(self, record):
  237. exc_info = sys.exc_info()
  238. try:
  239. try:
  240. traceback.print_exception(exc_info[0],
  241. exc_info[1],
  242. exc_info[2],
  243. None, sys.__stderr__)
  244. except IOError:
  245. pass # see python issue 5971
  246. finally:
  247. del(exc_info)
  248. handler.handleError = WithSafeHandleError().handleError
  249. return map(wrap_handler, self.logger.handlers)
  250. def write(self, data):
  251. if getattr(self._thread, "recurse_protection", False):
  252. # Logger is logging back to this file, so stop recursing.
  253. return
  254. """Write message to logging object."""
  255. data = data.strip()
  256. if data and not self.closed:
  257. self._thread.recurse_protection = True
  258. try:
  259. self.logger.log(self.loglevel, safe_str(data))
  260. finally:
  261. self._thread.recurse_protection = False
  262. def writelines(self, sequence):
  263. """`writelines(sequence_of_strings) -> None`.
  264. Write the strings to the file.
  265. The sequence can be any iterable object producing strings.
  266. This is equivalent to calling :meth:`write` for each string.
  267. """
  268. for part in sequence:
  269. self.write(part)
  270. def flush(self):
  271. """This object is not buffered so any :meth:`flush` requests
  272. are ignored."""
  273. pass
  274. def close(self):
  275. """When the object is closed, no write requests are forwarded to
  276. the logging object anymore."""
  277. self.closed = True
  278. def isatty(self):
  279. """Always returns :const:`False`. Just here for file support."""
  280. return False
  281. def fileno(self):
  282. pass
  283. class SilenceRepeated(object):
  284. """Only log action every n iterations."""
  285. def __init__(self, action, max_iterations=10):
  286. self.action = action
  287. self.max_iterations = max_iterations
  288. self._iterations = 0
  289. def __call__(self, *args, **kwargs):
  290. if not self._iterations or self._iterations >= self.max_iterations:
  291. self.action(*args, **kwargs)
  292. self._iterations = 0
  293. else:
  294. self._iterations += 1