log.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import sys
  5. import traceback
  6. import types
  7. from multiprocessing import current_process
  8. from multiprocessing import util as mputil
  9. from celery import signals
  10. from celery.app import app_or_default
  11. from celery.utils import LOG_LEVELS, isatty
  12. from celery.utils.compat import LoggerAdapter
  13. from celery.utils.patch import ensure_process_aware_logger
  14. from celery.utils.term import colored
  15. # The logging subsystem is only configured once per process.
  16. # setup_logging_subsystem sets this flag, and subsequent calls
  17. # will do nothing.
  18. _setup = False
  19. COLORS = {"DEBUG": "blue",
  20. "WARNING": "yellow",
  21. "ERROR": "red",
  22. "CRITICAL": "magenta"}
  23. class ColorFormatter(logging.Formatter):
  24. def __init__(self, msg, use_color=True):
  25. logging.Formatter.__init__(self, msg)
  26. self.use_color = use_color
  27. def formatException(self, ei):
  28. r = logging.Formatter.formatException(self, ei)
  29. if type(r) in [types.StringType]:
  30. r = r.decode("utf-8", "replace") # Convert to unicode
  31. return r
  32. def format(self, record):
  33. levelname = record.levelname
  34. if self.use_color and levelname in COLORS:
  35. record.msg = unicode(colored().names[COLORS[levelname]](
  36. record.msg))
  37. # Very ugly, but have to make sure processName is supported
  38. # by foreign logger instances.
  39. # (processName is always supported by Python 2.7)
  40. if "processName" not in record.__dict__:
  41. record.__dict__["processName"] = current_process()._name
  42. t = logging.Formatter.format(self, record)
  43. if type(t) in [types.UnicodeType]:
  44. t = t.encode('utf-8', 'replace')
  45. return t
  46. class Logging(object):
  47. _setup = False
  48. def __init__(self, app):
  49. self.app = app
  50. self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
  51. self.format = self.app.conf.CELERYD_LOG_FORMAT
  52. self.colorize = self.app.conf.CELERYD_LOG_COLOR
  53. def supports_color(self, logfile=None):
  54. if self.app.IS_WINDOWS:
  55. # Windows does not support ANSI color codes.
  56. return False
  57. if self.colorize is None:
  58. # Only use color if there is no active log file
  59. # and stderr is an actual terminal.
  60. return logfile is None and isatty(sys.stderr)
  61. return self.colorize
  62. def colored(self, logfile=None):
  63. return colored(enabled=self.supports_color(logfile))
  64. def get_task_logger(self, loglevel=None, name=None):
  65. logger = logging.getLogger(name or "celery.task.default")
  66. if loglevel is not None:
  67. logger.setLevel(loglevel)
  68. return logger
  69. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  70. format=None, colorize=None, **kwargs):
  71. loglevel = loglevel or self.loglevel
  72. format = format or self.format
  73. if colorize is None:
  74. colorize = self.supports_color(logfile)
  75. if self.__class__._setup:
  76. return
  77. try:
  78. mputil._logger = None
  79. except AttributeError:
  80. pass
  81. ensure_process_aware_logger()
  82. logging.Logger.manager.loggerDict.clear()
  83. receivers = signals.setup_logging.send(sender=None,
  84. loglevel=loglevel,
  85. logfile=logfile,
  86. format=format,
  87. colorize=colorize)
  88. if not receivers:
  89. root = logging.getLogger()
  90. if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  91. root.handlers = []
  92. mp = mputil.get_logger()
  93. for logger in (root, mp):
  94. self._setup_logger(logger, logfile,
  95. format, colorize, **kwargs)
  96. logger.setLevel(loglevel)
  97. self.__class__._setup = True
  98. return receivers
  99. def _detect_handler(self, logfile=None):
  100. """Create log handler with either a filename, an open stream
  101. or :const:`None` (stderr)."""
  102. if logfile is None:
  103. logfile = sys.__stderr__
  104. if hasattr(logfile, "write"):
  105. return logging.StreamHandler(logfile)
  106. return logging.FileHandler(logfile)
  107. def get_default_logger(self, loglevel=None, name="celery"):
  108. """Get default logger instance.
  109. :keyword loglevel: Initial log level.
  110. """
  111. logger = logging.getLogger(name)
  112. if loglevel is not None:
  113. logger.setLevel(loglevel)
  114. return logger
  115. def setup_logger(self, loglevel=None, logfile=None,
  116. format=None, colorize=None, name="celery", root=True,
  117. app=None, **kwargs):
  118. """Setup the :mod:`multiprocessing` logger.
  119. If `logfile` is not specified, then `sys.stderr` is used.
  120. Returns logger object.
  121. """
  122. loglevel = loglevel or self.loglevel
  123. format = format or self.format
  124. if colorize is None:
  125. colorize = self.supports_color(logfile)
  126. if not root or self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  127. return self._setup_logger(self.get_default_logger(loglevel, name),
  128. logfile, format, colorize, **kwargs)
  129. self.setup_logging_subsystem(loglevel, logfile,
  130. format, colorize, **kwargs)
  131. return self.get_default_logger(name=name)
  132. def setup_task_logger(self, loglevel=None, logfile=None, format=None,
  133. colorize=None, task_kwargs=None, propagate=False, app=None,
  134. **kwargs):
  135. """Setup the task logger.
  136. If `logfile` is not specified, then `sys.stderr` is used.
  137. Returns logger object.
  138. """
  139. loglevel = loglevel or self.loglevel
  140. format = format or self.format
  141. if colorize is None:
  142. colorize = self.supports_color(logfile)
  143. if task_kwargs is None:
  144. task_kwargs = {}
  145. task_kwargs.setdefault("task_id", "-?-")
  146. task_name = task_kwargs.get("task_name")
  147. task_kwargs.setdefault("task_name", "-?-")
  148. logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
  149. logfile, format, colorize, **kwargs)
  150. logger.propagate = int(propagate) # this is an int for some reason.
  151. # better to not question why.
  152. return LoggerAdapter(logger, task_kwargs)
  153. def redirect_stdouts_to_logger(self, logger, loglevel=None):
  154. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  155. logging instance.
  156. :param logger: The :class:`logging.Logger` instance to redirect to.
  157. :param loglevel: The loglevel redirected messages will be logged as.
  158. """
  159. proxy = LoggingProxy(logger, loglevel)
  160. sys.stdout = sys.stderr = proxy
  161. return proxy
  162. def _setup_logger(self, logger, logfile, format, colorize,
  163. formatter=ColorFormatter, **kwargs):
  164. if logger.handlers: # Logger already configured
  165. return logger
  166. handler = self._detect_handler(logfile)
  167. handler.setFormatter(formatter(format, use_color=colorize))
  168. logger.addHandler(handler)
  169. return logger
  170. _default_logging = Logging(app_or_default())
  171. setup_logging_subsystem = _default_logging.setup_logging_subsystem
  172. get_default_logger = _default_logging.get_default_logger
  173. setup_logger = _default_logging.setup_logger
  174. setup_task_logger = _default_logging.setup_task_logger
  175. get_task_logger = _default_logging.get_task_logger
  176. redirect_stdouts_to_logger = _default_logging.redirect_stdouts_to_logger
  177. class LoggingProxy(object):
  178. """Forward file object to :class:`logging.Logger` instance.
  179. :param logger: The :class:`logging.Logger` instance to forward to.
  180. :param loglevel: Loglevel to use when writing messages.
  181. """
  182. mode = "w"
  183. name = None
  184. closed = False
  185. loglevel = logging.ERROR
  186. _thread = threading.local()
  187. def __init__(self, logger, loglevel=None):
  188. self.logger = logger
  189. self.loglevel = loglevel or self.logger.level or self.loglevel
  190. if not isinstance(self.loglevel, int):
  191. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  192. self._safewrap_handlers()
  193. def _safewrap_handlers(self):
  194. """Make the logger handlers dump internal errors to
  195. `sys.__stderr__` instead of `sys.stderr` to circumvent
  196. infinite loops."""
  197. def wrap_handler(handler): # pragma: no cover
  198. class WithSafeHandleError(logging.Handler):
  199. def handleError(self, record):
  200. exc_info = sys.exc_info()
  201. try:
  202. try:
  203. traceback.print_exception(exc_info[0],
  204. exc_info[1],
  205. exc_info[2],
  206. None, sys.__stderr__)
  207. except IOError:
  208. pass # see python issue 5971
  209. finally:
  210. del(exc_info)
  211. handler.handleError = WithSafeHandleError().handleError
  212. return map(wrap_handler, self.logger.handlers)
  213. def write(self, data):
  214. if getattr(self._thread, "recurse_protection", False):
  215. # Logger is logging back to this file, so stop recursing.
  216. return
  217. """Write message to logging object."""
  218. data = data.strip()
  219. if data and not self.closed:
  220. self._thread.recurse_protection = True
  221. try:
  222. self.logger.log(self.loglevel, data)
  223. finally:
  224. self._thread.recurse_protection = False
  225. def writelines(self, sequence):
  226. """`writelines(sequence_of_strings) -> None`.
  227. Write the strings to the file.
  228. The sequence can be any iterable object producing strings.
  229. This is equivalent to calling :meth:`write` for each string.
  230. """
  231. for part in sequence:
  232. self.write(part)
  233. def flush(self):
  234. """This object is not buffered so any :meth:`flush` requests
  235. are ignored."""
  236. pass
  237. def close(self):
  238. """When the object is closed, no write requests are forwarded to
  239. the logging object anymore."""
  240. self.closed = True
  241. def isatty(self):
  242. """Always returns :const:`False`. Just here for file support."""
  243. return False
  244. def fileno(self):
  245. return None
  246. class SilenceRepeated(object):
  247. """Only log action every n iterations."""
  248. def __init__(self, action, max_iterations=10):
  249. self.action = action
  250. self.max_iterations = max_iterations
  251. self._iterations = 0
  252. def __call__(self, *msgs):
  253. if self._iterations >= self.max_iterations:
  254. for msg in msgs:
  255. self.action(msg)
  256. self._iterations = 0
  257. else:
  258. self._iterations += 1