log.py 10.0 KB


  1. """celery.log"""
  2. import logging
  3. import threading
  4. import sys
  5. import traceback
  6. import types
  7. from multiprocessing import current_process
  8. from multiprocessing import util as mputil
  9. from celery import signals
  10. from celery.app import app_or_default
  11. from celery.utils import LOG_LEVELS
  12. from celery.utils.compat import LoggerAdapter
  13. from celery.utils.patch import ensure_process_aware_logger
  14. from celery.utils.term import colored
  15. # The logging subsystem is only configured once per process.
  16. # setup_logging_subsystem sets this flag, and subsequent calls
  17. # will do nothing.
  18. _setup = False
  19. COLORS = {"DEBUG": "blue",
  20. "WARNING": "yellow",
  21. "ERROR": "red",
  22. "CRITICAL": "magenta"}
  23. class ColorFormatter(logging.Formatter):
  24. def __init__(self, msg, use_color=True):
  25. logging.Formatter.__init__(self, msg)
  26. self.use_color = use_color
  27. def formatException(self, ei):
  28. r = logging.Formatter.formatException(self, ei)
  29. if type(r) in [types.StringType]:
  30. r = r.decode('utf-8', 'replace') # Convert to unicode
  31. return r
  32. def format(self, record):
  33. levelname = record.levelname
  34. if self.use_color and levelname in COLORS:
  35. record.msg = unicode(colored().names[COLORS[levelname]](record.msg))
  36. # Very ugly, but have to make sure processName is supported
  37. # by foreign logger instances.
  38. # (processName is always supported by Python 2.7)
  39. if "processName" not in record.__dict__:
  40. record.__dict__["processName"] = current_process()._name
  41. t = logging.Formatter.format(self, record)
  42. if type(t) in [types.UnicodeType]:
  43. t = t.encode('utf-8', 'replace')
  44. return t
  45. class Logging(object):
  46. _setup = False
  47. def __init__(self, app):
  48. self.app = app
  49. self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
  50. self.format = self.app.conf.CELERYD_LOG_FORMAT
  51. def get_task_logger(self, loglevel=None, name=None):
  52. logger = logging.getLogger(name or "celery.task.default")
  53. if loglevel is not None:
  54. logger.setLevel(loglevel)
  55. return logger
  56. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  57. format=None, colorize=None, **kwargs):
  58. loglevel = loglevel or self.loglevel
  59. format = format or self.format
  60. colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
  61. if self.__class__._setup:
  62. return
  63. try:
  64. mputil._logger = None
  65. except AttributeError:
  66. pass
  67. ensure_process_aware_logger()
  68. logging.Logger.manager.loggerDict.clear()
  69. receivers = signals.setup_logging.send(sender=None,
  70. loglevel=loglevel,
  71. logfile=logfile,
  72. format=format,
  73. colorize=colorize)
  74. if not receivers:
  75. root = logging.getLogger()
  76. mp = mputil.get_logger()
  77. for logger in (root, mp):
  78. self._setup_logger(logger, logfile,
  79. format, colorize, **kwargs)
  80. logger.setLevel(loglevel)
  81. self.__class__._setup = True
  82. return receivers
  83. def _detect_handler(self, logfile=None):
  84. """Create log handler with either a filename, an open stream
  85. or :const:`None` (stderr)."""
  86. if not logfile or hasattr(logfile, "write"):
  87. return logging.StreamHandler(logfile)
  88. return logging.FileHandler(logfile)
  89. def get_default_logger(self, loglevel=None, name="celery"):
  90. """Get default logger instance.
  91. :keyword loglevel: Initial log level.
  92. """
  93. logger = logging.getLogger(name)
  94. if loglevel is not None:
  95. logger.setLevel(loglevel)
  96. return logger
  97. def setup_logger(self, loglevel=None, logfile=None,
  98. format=None, colorize=None, name="celery", root=True,
  99. app=None, **kwargs):
  100. """Setup the :mod:`multiprocessing` logger.
  101. If `logfile` is not specified, then `sys.stderr` is used.
  102. Returns logger object.
  103. """
  104. loglevel = loglevel or self.loglevel
  105. format = format or self.format
  106. colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
  107. if not root:
  108. return self._setup_logger(self.get_default_logger(loglevel, name),
  109. logfile, format, colorize, **kwargs)
  110. self.setup_logging_subsystem(loglevel, logfile,
  111. format, colorize, **kwargs)
  112. return self.get_default_logger(name=name)
  113. def setup_task_logger(self, loglevel=None, logfile=None, format=None,
  114. colorize=None, task_kwargs=None, app=None, **kwargs):
  115. """Setup the task logger.
  116. If `logfile` is not specified, then `sys.stderr` is used.
  117. Returns logger object.
  118. """
  119. loglevel = loglevel or self.loglevel
  120. format = format or self.format
  121. colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
  122. if task_kwargs is None:
  123. task_kwargs = {}
  124. task_kwargs.setdefault("task_id", "-?-")
  125. task_name = task_kwargs.get("task_name")
  126. task_kwargs.setdefault("task_name", "-?-")
  127. logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
  128. logfile, format, colorize, **kwargs)
  129. return LoggerAdapter(logger, task_kwargs)
  130. def redirect_stdouts_to_logger(self, logger, loglevel=None):
  131. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  132. logging instance.
  133. :param logger: The :class:`logging.Logger` instance to redirect to.
  134. :param loglevel: The loglevel redirected messages will be logged as.
  135. """
  136. proxy = LoggingProxy(logger, loglevel)
  137. sys.stdout = sys.stderr = proxy
  138. return proxy
  139. def _setup_logger(self, logger, logfile, format, colorize,
  140. formatter=ColorFormatter, **kwargs):
  141. if logger.handlers: # Logger already configured
  142. return logger
  143. handler = self._detect_handler(logfile)
  144. handler.setFormatter(formatter(format, use_color=colorize))
  145. logger.addHandler(handler)
  146. return logger
  147. _default_logging = Logging(app_or_default())
  148. setup_logging_subsystem = _default_logging.setup_logging_subsystem
  149. get_default_logger = _default_logging.get_default_logger
  150. setup_logger = _default_logging.setup_logger
  151. setup_task_logger = _default_logging.setup_task_logger
  152. get_task_logger = _default_logging.get_task_logger
  153. redirect_stdouts_to_logger = _default_logging.redirect_stdouts_to_logger
  154. class LoggingProxy(object):
  155. """Forward file object to :class:`logging.Logger` instance.
  156. :param logger: The :class:`logging.Logger` instance to forward to.
  157. :param loglevel: Loglevel to use when writing messages.
  158. """
  159. mode = "w"
  160. name = None
  161. closed = False
  162. loglevel = logging.ERROR
  163. _thread = threading.local()
  164. def __init__(self, logger, loglevel=None):
  165. self.logger = logger
  166. self.loglevel = loglevel or self.logger.level or self.loglevel
  167. if not isinstance(self.loglevel, int):
  168. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  169. self._safewrap_handlers()
  170. def _safewrap_handlers(self):
  171. """Make the logger handlers dump internal errors to
  172. `sys.__stderr__` instead of `sys.stderr` to circumvent
  173. infinite loops."""
  174. def wrap_handler(handler): # pragma: no cover
  175. class WithSafeHandleError(logging.Handler):
  176. def handleError(self, record):
  177. exc_info = sys.exc_info()
  178. try:
  179. try:
  180. traceback.print_exception(exc_info[0],
  181. exc_info[1],
  182. exc_info[2],
  183. None, sys.__stderr__)
  184. except IOError:
  185. pass # see python issue 5971
  186. finally:
  187. del(exc_info)
  188. handler.handleError = WithSafeHandleError().handleError
  189. return map(wrap_handler, self.logger.handlers)
  190. def write(self, data):
  191. if getattr(self._thread, "recurse_protection", False):
  192. # Logger is logging back to this file, so stop recursing.
  193. return
  194. """Write message to logging object."""
  195. data = data.strip()
  196. if data and not self.closed:
  197. self._thread.recurse_protection = True
  198. try:
  199. self.logger.log(self.loglevel, data)
  200. finally:
  201. self._thread.recurse_protection = False
  202. def writelines(self, sequence):
  203. """`writelines(sequence_of_strings) -> None`.
  204. Write the strings to the file.
  205. The sequence can be any iterable object producing strings.
  206. This is equivalent to calling :meth:`write` for each string.
  207. """
  208. for part in sequence:
  209. self.write(part)
  210. def flush(self):
  211. """This object is not buffered so any :meth:`flush` requests
  212. are ignored."""
  213. pass
  214. def close(self):
  215. """When the object is closed, no write requests are forwarded to
  216. the logging object anymore."""
  217. self.closed = True
  218. def isatty(self):
  219. """Always returns :const:`False`. Just here for file support."""
  220. return False
  221. def fileno(self):
  222. return None
  223. class SilenceRepeated(object):
  224. """Only log action every n iterations."""
  225. def __init__(self, action, max_iterations=10):
  226. self.action = action
  227. self.max_iterations = max_iterations
  228. self._iterations = 0
  229. def __call__(self, *msgs):
  230. if self._iterations >= self.max_iterations:
  231. for msg in msgs:
  232. self.action(msg)
  233. self._iterations = 0
  234. else:
  235. self._iterations += 1