log.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import sys
  5. import traceback
  6. try:
  7. from multiprocessing import current_process
  8. from multiprocessing import util as mputil
  9. except ImportError:
  10. current_process = mputil = None
  11. from celery import signals
  12. from celery import current_app
  13. from celery.utils import LOG_LEVELS, isatty
  14. from celery.utils.compat import LoggerAdapter
  15. from celery.utils.patch import ensure_process_aware_logger
  16. from celery.utils.term import colored
  17. class ColorFormatter(logging.Formatter):
  18. #: Loglevel -> Color mapping.
  19. COLORS = colored().names
  20. colors = {"DEBUG": COLORS["blue"], "WARNING": COLORS["yellow"],
  21. "ERROR": COLORS["red"], "CRITICAL": COLORS["magenta"]}
  22. def __init__(self, msg, use_color=True):
  23. logging.Formatter.__init__(self, msg)
  24. self.use_color = use_color
  25. def formatException(self, ei):
  26. r = logging.Formatter.formatException(self, ei)
  27. if isinstance(r, str):
  28. return r.decode("utf-8", "replace") # Convert to unicode
  29. return r
  30. def format(self, record):
  31. levelname = record.levelname
  32. color = self.colors.get(levelname)
  33. if self.use_color and color:
  34. record.msg = unicode(color(record.msg))
  35. # Very ugly, but have to make sure processName is supported
  36. # by foreign logger instances.
  37. # (processName is always supported by Python 2.7)
  38. if "processName" not in record.__dict__:
  39. process_name = current_process and current_process()._name or ""
  40. record.__dict__["processName"] = process_name
  41. t = logging.Formatter.format(self, record)
  42. if isinstance(t, unicode):
  43. return t.encode("utf-8", "replace")
  44. return t
  45. class Logging(object):
  46. #: The logging subsystem is only configured once per process.
  47. #: setup_logging_subsystem sets this flag, and subsequent calls
  48. #: will do nothing.
  49. _setup = False
  50. def __init__(self, app):
  51. self.app = app
  52. self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
  53. self.format = self.app.conf.CELERYD_LOG_FORMAT
  54. self.colorize = self.app.conf.CELERYD_LOG_COLOR
  55. def supports_color(self, logfile=None):
  56. if self.app.IS_WINDOWS:
  57. # Windows does not support ANSI color codes.
  58. return False
  59. if self.colorize is None:
  60. # Only use color if there is no active log file
  61. # and stderr is an actual terminal.
  62. return logfile is None and isatty(sys.stderr)
  63. return self.colorize
  64. def colored(self, logfile=None):
  65. return colored(enabled=self.supports_color(logfile))
  66. def get_task_logger(self, loglevel=None, name=None):
  67. logger = logging.getLogger(name or "celery.task.default")
  68. if loglevel is not None:
  69. logger.setLevel(loglevel)
  70. return logger
  71. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  72. format=None, colorize=None, **kwargs):
  73. if Logging._setup:
  74. return
  75. loglevel = loglevel or self.loglevel
  76. format = format or self.format
  77. if colorize is None:
  78. colorize = self.supports_color(logfile)
  79. if mputil:
  80. try:
  81. mputil._logger = None
  82. except AttributeError:
  83. pass
  84. ensure_process_aware_logger()
  85. receivers = signals.setup_logging.send(sender=None,
  86. loglevel=loglevel,
  87. logfile=logfile,
  88. format=format,
  89. colorize=colorize)
  90. if not receivers:
  91. root = logging.getLogger()
  92. if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  93. root.handlers = []
  94. mp = mputil and mputil.get_logger() or None
  95. for logger in (root, mp):
  96. if logger:
  97. self._setup_logger(logger, logfile, format,
  98. colorize, **kwargs)
  99. logger.setLevel(loglevel)
  100. Logging._setup = True
  101. return receivers
  102. def _detect_handler(self, logfile=None):
  103. """Create log handler with either a filename, an open stream
  104. or :const:`None` (stderr)."""
  105. if logfile is None:
  106. logfile = sys.__stderr__
  107. if hasattr(logfile, "write"):
  108. return logging.StreamHandler(logfile)
  109. return logging.FileHandler(logfile)
  110. def get_default_logger(self, loglevel=None, name="celery"):
  111. """Get default logger instance.
  112. :keyword loglevel: Initial log level.
  113. """
  114. logger = logging.getLogger(name)
  115. if loglevel is not None:
  116. logger.setLevel(loglevel)
  117. return logger
  118. def setup_logger(self, loglevel=None, logfile=None,
  119. format=None, colorize=None, name="celery", root=True,
  120. app=None, **kwargs):
  121. """Setup the :mod:`multiprocessing` logger.
  122. If `logfile` is not specified, then `sys.stderr` is used.
  123. Returns logger object.
  124. """
  125. loglevel = loglevel or self.loglevel
  126. format = format or self.format
  127. if colorize is None:
  128. colorize = self.supports_color(logfile)
  129. if not root or self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  130. return self._setup_logger(self.get_default_logger(loglevel, name),
  131. logfile, format, colorize, **kwargs)
  132. self.setup_logging_subsystem(loglevel, logfile,
  133. format, colorize, **kwargs)
  134. return self.get_default_logger(name=name)
  135. def setup_task_logger(self, loglevel=None, logfile=None, format=None,
  136. colorize=None, task_name='-?-', task_id='-?-', propagate=False,
  137. app=None, **kwargs):
  138. """Setup the task logger.
  139. If `logfile` is not specified, then `sys.stderr` is used.
  140. Returns logger object.
  141. """
  142. loglevel = loglevel or self.loglevel
  143. format = format or self.format
  144. if colorize is None:
  145. colorize = self.supports_color(logfile)
  146. logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
  147. logfile, format, colorize, **kwargs)
  148. logger.propagate = int(propagate) # this is an int for some reason.
  149. # better to not question why.
  150. extra = {
  151. "task_id": task_id,
  152. "task_name": task_name,
  153. }
  154. return LoggerAdapter(logger, extra)
  155. def redirect_stdouts_to_logger(self, logger, loglevel=None):
  156. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  157. logging instance.
  158. :param logger: The :class:`logging.Logger` instance to redirect to.
  159. :param loglevel: The loglevel redirected messages will be logged as.
  160. """
  161. proxy = LoggingProxy(logger, loglevel)
  162. sys.stdout = sys.stderr = proxy
  163. return proxy
  164. def _setup_logger(self, logger, logfile, format, colorize,
  165. formatter=ColorFormatter, **kwargs):
  166. if logger.handlers: # Logger already configured
  167. return logger
  168. handler = self._detect_handler(logfile)
  169. handler.setFormatter(formatter(format, use_color=colorize))
  170. logger.addHandler(handler)
  171. return logger
  172. setup_logging_subsystem = current_app.log.setup_logging_subsystem
  173. get_default_logger = current_app.log.get_default_logger
  174. setup_logger = current_app.log.setup_logger
  175. setup_task_logger = current_app.log.setup_task_logger
  176. get_task_logger = current_app.log.get_task_logger
  177. redirect_stdouts_to_logger = current_app.log.redirect_stdouts_to_logger
  178. class LoggingProxy(object):
  179. """Forward file object to :class:`logging.Logger` instance.
  180. :param logger: The :class:`logging.Logger` instance to forward to.
  181. :param loglevel: Loglevel to use when writing messages.
  182. """
  183. mode = "w"
  184. name = None
  185. closed = False
  186. loglevel = logging.ERROR
  187. _thread = threading.local()
  188. def __init__(self, logger, loglevel=None):
  189. self.logger = logger
  190. self.loglevel = loglevel or self.logger.level or self.loglevel
  191. if not isinstance(self.loglevel, int):
  192. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  193. self._safewrap_handlers()
  194. def _safewrap_handlers(self):
  195. """Make the logger handlers dump internal errors to
  196. `sys.__stderr__` instead of `sys.stderr` to circumvent
  197. infinite loops."""
  198. def wrap_handler(handler): # pragma: no cover
  199. class WithSafeHandleError(logging.Handler):
  200. def handleError(self, record):
  201. exc_info = sys.exc_info()
  202. try:
  203. try:
  204. traceback.print_exception(exc_info[0],
  205. exc_info[1],
  206. exc_info[2],
  207. None, sys.__stderr__)
  208. except IOError:
  209. pass # see python issue 5971
  210. finally:
  211. del(exc_info)
  212. handler.handleError = WithSafeHandleError().handleError
  213. return map(wrap_handler, self.logger.handlers)
  214. def write(self, data):
  215. if getattr(self._thread, "recurse_protection", False):
  216. # Logger is logging back to this file, so stop recursing.
  217. return
  218. """Write message to logging object."""
  219. data = data.strip()
  220. if data and not self.closed:
  221. self._thread.recurse_protection = True
  222. try:
  223. self.logger.log(self.loglevel, data)
  224. finally:
  225. self._thread.recurse_protection = False
  226. def writelines(self, sequence):
  227. """`writelines(sequence_of_strings) -> None`.
  228. Write the strings to the file.
  229. The sequence can be any iterable object producing strings.
  230. This is equivalent to calling :meth:`write` for each string.
  231. """
  232. for part in sequence:
  233. self.write(part)
  234. def flush(self):
  235. """This object is not buffered so any :meth:`flush` requests
  236. are ignored."""
  237. pass
  238. def close(self):
  239. """When the object is closed, no write requests are forwarded to
  240. the logging object anymore."""
  241. self.closed = True
  242. def isatty(self):
  243. """Always returns :const:`False`. Just here for file support."""
  244. return False
  245. def fileno(self):
  246. return None
  247. class SilenceRepeated(object):
  248. """Only log action every n iterations."""
  249. def __init__(self, action, max_iterations=10):
  250. self.action = action
  251. self.max_iterations = max_iterations
  252. self._iterations = 0
  253. def __call__(self, *msgs):
  254. if self._iterations >= self.max_iterations:
  255. for msg in msgs:
  256. self.action(msg)
  257. self._iterations = 0
  258. else:
  259. self._iterations += 1