log.py 11 KB


  1. """celery.log"""
  2. import logging
  3. import threading
  4. import sys
  5. import traceback
  6. try:
  7. from multiprocessing import current_process
  8. from multiprocessing import util as mputil
  9. except ImportError:
  10. current_process = mputil = None
  11. from celery import signals
  12. from celery import current_app
  13. from celery.utils import LOG_LEVELS, isatty
  14. from celery.utils.compat import LoggerAdapter
  15. from celery.utils.compat import WatchedFileHandler
  16. from celery.utils.encoding import safe_str
  17. from celery.utils.patch import ensure_process_aware_logger
  18. from celery.utils.term import colored
  19. class ColorFormatter(logging.Formatter):
  20. #: Loglevel -> Color mapping.
  21. COLORS = colored().names
  22. colors = {"DEBUG": COLORS["blue"], "WARNING": COLORS["yellow"],
  23. "ERROR": COLORS["red"], "CRITICAL": COLORS["magenta"]}
  24. def __init__(self, msg, use_color=True):
  25. logging.Formatter.__init__(self, msg)
  26. self.use_color = use_color
  27. def formatException(self, ei):
  28. r = logging.Formatter.formatException(self, ei)
  29. if isinstance(r, str):
  30. return r.decode("utf-8", "replace") # Convert to unicode
  31. return r
  32. def format(self, record):
  33. levelname = record.levelname
  34. color = self.colors.get(levelname)
  35. if self.use_color and color:
  36. try:
  37. record.msg = color(safe_str(record.msg))
  38. except Exception:
  39. record.msg = "<Unrepresentable %r: %r>" % (
  40. type(record.msg), traceback.format_stack())
  41. # Very ugly, but have to make sure processName is supported
  42. # by foreign logger instances.
  43. # (processName is always supported by Python 2.7)
  44. if "processName" not in record.__dict__:
  45. process_name = current_process and current_process()._name or ""
  46. record.__dict__["processName"] = process_name
  47. t = logging.Formatter.format(self, record)
  48. if isinstance(t, unicode):
  49. return t.encode("utf-8", "replace")
  50. return t
  51. class Logging(object):
  52. #: The logging subsystem is only configured once per process.
  53. #: setup_logging_subsystem sets this flag, and subsequent calls
  54. #: will do nothing.
  55. _setup = False
  56. def __init__(self, app):
  57. self.app = app
  58. self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
  59. self.format = self.app.conf.CELERYD_LOG_FORMAT
  60. self.task_format = self.app.conf.CELERYD_TASK_LOG_FORMAT
  61. self.colorize = self.app.conf.CELERYD_LOG_COLOR
  62. def supports_color(self, logfile=None):
  63. if self.app.IS_WINDOWS:
  64. # Windows does not support ANSI color codes.
  65. return False
  66. if self.colorize is None:
  67. # Only use color if there is no active log file
  68. # and stderr is an actual terminal.
  69. return logfile is None and isatty(sys.stderr)
  70. return self.colorize
  71. def colored(self, logfile=None):
  72. return colored(enabled=self.supports_color(logfile))
  73. def get_task_logger(self, loglevel=None, name=None):
  74. logger = logging.getLogger(name or "celery.task.default")
  75. if loglevel is not None:
  76. logger.setLevel(loglevel)
  77. return logger
  78. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  79. format=None, colorize=None, **kwargs):
  80. if Logging._setup:
  81. return
  82. loglevel = loglevel or self.loglevel
  83. format = format or self.format
  84. if colorize is None:
  85. colorize = self.supports_color(logfile)
  86. if mputil:
  87. try:
  88. mputil._logger = None
  89. except AttributeError:
  90. pass
  91. ensure_process_aware_logger()
  92. receivers = signals.setup_logging.send(sender=None,
  93. loglevel=loglevel,
  94. logfile=logfile,
  95. format=format,
  96. colorize=colorize)
  97. if not receivers:
  98. root = logging.getLogger()
  99. if self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  100. root.handlers = []
  101. mp = mputil and mputil.get_logger() or None
  102. for logger in (root, mp):
  103. if logger:
  104. self._setup_logger(logger, logfile, format,
  105. colorize, **kwargs)
  106. logger.setLevel(loglevel)
  107. Logging._setup = True
  108. return receivers
  109. def _detect_handler(self, logfile=None):
  110. """Create log handler with either a filename, an open stream
  111. or :const:`None` (stderr)."""
  112. if logfile is None:
  113. logfile = sys.__stderr__
  114. if hasattr(logfile, "write"):
  115. return logging.StreamHandler(logfile)
  116. return WatchedFileHandler(logfile)
  117. def get_default_logger(self, loglevel=None, name="celery"):
  118. """Get default logger instance.
  119. :keyword loglevel: Initial log level.
  120. """
  121. logger = logging.getLogger(name)
  122. if loglevel is not None:
  123. logger.setLevel(loglevel)
  124. return logger
  125. def setup_logger(self, loglevel=None, logfile=None,
  126. format=None, colorize=None, name="celery", root=True,
  127. app=None, **kwargs):
  128. """Setup the :mod:`multiprocessing` logger.
  129. If `logfile` is not specified, then `sys.stderr` is used.
  130. Returns logger object.
  131. """
  132. loglevel = loglevel or self.loglevel
  133. format = format or self.format
  134. if colorize is None:
  135. colorize = self.supports_color(logfile)
  136. if not root or self.app.conf.CELERYD_HIJACK_ROOT_LOGGER:
  137. return self._setup_logger(self.get_default_logger(loglevel, name),
  138. logfile, format, colorize, **kwargs)
  139. self.setup_logging_subsystem(loglevel, logfile,
  140. format, colorize, **kwargs)
  141. return self.get_default_logger(name=name)
  142. def setup_task_logger(self, loglevel=None, logfile=None, format=None,
  143. colorize=None, task_name=None, task_id=None, propagate=False,
  144. app=None, **kwargs):
  145. """Setup the task logger.
  146. If `logfile` is not specified, then `sys.stderr` is used.
  147. Returns logger object.
  148. """
  149. loglevel = loglevel or self.loglevel
  150. format = format or self.task_format
  151. if colorize is None:
  152. colorize = self.supports_color(logfile)
  153. logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
  154. logfile, format, colorize, **kwargs)
  155. logger.propagate = int(propagate) # this is an int for some reason.
  156. # better to not question why.
  157. return LoggerAdapter(logger, {"task_id": task_id,
  158. "task_name": task_name})
  159. def redirect_stdouts_to_logger(self, logger, loglevel=None):
  160. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  161. logging instance.
  162. :param logger: The :class:`logging.Logger` instance to redirect to.
  163. :param loglevel: The loglevel redirected messages will be logged as.
  164. """
  165. proxy = LoggingProxy(logger, loglevel)
  166. sys.stdout = sys.stderr = proxy
  167. return proxy
  168. def _setup_logger(self, logger, logfile, format, colorize,
  169. formatter=ColorFormatter, **kwargs):
  170. if logger.handlers: # Logger already configured
  171. return logger
  172. handler = self._detect_handler(logfile)
  173. handler.setFormatter(formatter(format, use_color=colorize))
  174. logger.addHandler(handler)
  175. return logger
  176. setup_logging_subsystem = current_app.log.setup_logging_subsystem
  177. get_default_logger = current_app.log.get_default_logger
  178. setup_logger = current_app.log.setup_logger
  179. setup_task_logger = current_app.log.setup_task_logger
  180. get_task_logger = current_app.log.get_task_logger
  181. redirect_stdouts_to_logger = current_app.log.redirect_stdouts_to_logger
  182. class LoggingProxy(object):
  183. """Forward file object to :class:`logging.Logger` instance.
  184. :param logger: The :class:`logging.Logger` instance to forward to.
  185. :param loglevel: Loglevel to use when writing messages.
  186. """
  187. mode = "w"
  188. name = None
  189. closed = False
  190. loglevel = logging.ERROR
  191. _thread = threading.local()
  192. def __init__(self, logger, loglevel=None):
  193. self.logger = logger
  194. self.loglevel = loglevel or self.logger.level or self.loglevel
  195. if not isinstance(self.loglevel, int):
  196. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  197. self._safewrap_handlers()
  198. def _safewrap_handlers(self):
  199. """Make the logger handlers dump internal errors to
  200. `sys.__stderr__` instead of `sys.stderr` to circumvent
  201. infinite loops."""
  202. def wrap_handler(handler): # pragma: no cover
  203. class WithSafeHandleError(logging.Handler):
  204. def handleError(self, record):
  205. exc_info = sys.exc_info()
  206. try:
  207. try:
  208. traceback.print_exception(exc_info[0],
  209. exc_info[1],
  210. exc_info[2],
  211. None, sys.__stderr__)
  212. except IOError:
  213. pass # see python issue 5971
  214. finally:
  215. del(exc_info)
  216. handler.handleError = WithSafeHandleError().handleError
  217. return map(wrap_handler, self.logger.handlers)
  218. def write(self, data):
  219. if getattr(self._thread, "recurse_protection", False):
  220. # Logger is logging back to this file, so stop recursing.
  221. return
  222. """Write message to logging object."""
  223. data = data.strip()
  224. if data and not self.closed:
  225. self._thread.recurse_protection = True
  226. try:
  227. self.logger.log(self.loglevel, data)
  228. finally:
  229. self._thread.recurse_protection = False
  230. def writelines(self, sequence):
  231. """`writelines(sequence_of_strings) -> None`.
  232. Write the strings to the file.
  233. The sequence can be any iterable object producing strings.
  234. This is equivalent to calling :meth:`write` for each string.
  235. """
  236. for part in sequence:
  237. self.write(part)
  238. def flush(self):
  239. """This object is not buffered so any :meth:`flush` requests
  240. are ignored."""
  241. pass
  242. def close(self):
  243. """When the object is closed, no write requests are forwarded to
  244. the logging object anymore."""
  245. self.closed = True
  246. def isatty(self):
  247. """Always returns :const:`False`. Just here for file support."""
  248. return False
  249. def fileno(self):
  250. return None
  251. class SilenceRepeated(object):
  252. """Only log action every n iterations."""
  253. def __init__(self, action, max_iterations=10):
  254. self.action = action
  255. self.max_iterations = max_iterations
  256. self._iterations = 0
  257. def __call__(self, *msgs):
  258. if self._iterations >= self.max_iterations:
  259. for msg in msgs:
  260. self.action(msg)
  261. self._iterations = 0
  262. else:
  263. self._iterations += 1