log.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import sys
  5. import traceback
  6. from multiprocessing import current_process
  7. from multiprocessing import util as mputil
  8. from celery import signals
  9. from celery.app import app_or_default
  10. from celery.utils.compat import LoggerAdapter
  11. from celery.utils.patch import ensure_process_aware_logger
  12. from celery.utils.term import colored
  13. # The logging subsystem is only configured once per process.
  14. # setup_logging_subsystem sets this flag, and subsequent calls
  15. # will do nothing.
  16. _setup = False
  17. COLORS = {"DEBUG": "blue",
  18. "WARNING": "yellow",
  19. "ERROR": "red",
  20. "CRITICAL": "magenta"}
  21. class ColorFormatter(logging.Formatter):
  22. def __init__(self, msg, use_color=True):
  23. logging.Formatter.__init__(self, msg)
  24. self.use_color = use_color
  25. def format(self, record):
  26. levelname = record.levelname
  27. if self.use_color and levelname in COLORS:
  28. record.msg = str(colored().names[COLORS[levelname]](record.msg))
  29. # Very ugly, but have to make sure processName is supported
  30. # by foreign logger instances.
  31. # (processName is always supported by Python 2.7)
  32. if "processName" not in record.__dict__:
  33. record.__dict__["processName"] = current_process()._name
  34. return logging.Formatter.format(self, record)
  35. class Logging(object):
  36. _setup = False
  37. def __init__(self, app):
  38. self.app = app
  39. self.loglevel = self.app.conf.CELERYD_LOG_LEVEL
  40. self.format = self.app.conf.CELERYD_LOG_FORMAT
  41. def get_task_logger(self, loglevel=None, name=None):
  42. logger = logging.getLogger(name or "celery.task.default")
  43. if loglevel is not None:
  44. logger.setLevel(loglevel)
  45. return logger
  46. def setup_logging_subsystem(self, loglevel=None, logfile=None,
  47. format=None, colorize=None, **kwargs):
  48. loglevel = loglevel or self.loglevel
  49. format = format or self.format
  50. colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
  51. if self.__class__._setup:
  52. return
  53. try:
  54. mputil._logger = None
  55. except AttributeError:
  56. pass
  57. ensure_process_aware_logger()
  58. logging.Logger.manager.loggerDict.clear()
  59. receivers = signals.setup_logging.send(sender=None,
  60. loglevel=loglevel,
  61. logfile=logfile,
  62. format=format,
  63. colorize=colorize)
  64. if not receivers:
  65. root = logging.getLogger()
  66. mp = mputil.get_logger()
  67. for logger in (root, mp):
  68. self._setup_logger(logger, logfile,
  69. format, colorize, **kwargs)
  70. logger.setLevel(loglevel)
  71. self.__class__._setup = True
  72. return receivers
  73. def _detect_handler(self, logfile=None):
  74. """Create log handler with either a filename, an open stream
  75. or ``None`` (stderr)."""
  76. if not logfile or hasattr(logfile, "write"):
  77. return logging.StreamHandler(logfile)
  78. return logging.FileHandler(logfile)
  79. def get_default_logger(self, loglevel=None, name="celery"):
  80. """Get default logger instance.
  81. :keyword loglevel: Initial log level.
  82. """
  83. logger = logging.getLogger(name)
  84. if loglevel is not None:
  85. logger.setLevel(loglevel)
  86. return logger
  87. def setup_logger(self, loglevel=None, logfile=None,
  88. format=None, colorize=None, name="celery", root=True,
  89. app=None, **kwargs):
  90. """Setup the ``multiprocessing`` logger.
  91. If ``logfile`` is not specified, then ``sys.stderr`` is used.
  92. Returns logger object.
  93. """
  94. loglevel = loglevel or self.loglevel
  95. format = format or self.format
  96. colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
  97. if not root:
  98. return self._setup_logger(self.get_default_logger(loglevel, name),
  99. logfile, format, colorize, **kwargs)
  100. self.setup_logging_subsystem(loglevel, logfile,
  101. format, colorize, **kwargs)
  102. return self.get_default_logger(name=name)
  103. def setup_task_logger(self, loglevel=None, logfile=None, format=None,
  104. colorize=None, task_kwargs=None, app=None, **kwargs):
  105. """Setup the task logger.
  106. If ``logfile`` is not specified, then ``sys.stderr`` is used.
  107. Returns logger object.
  108. """
  109. loglevel = loglevel or self.loglevel
  110. format = format or self.format
  111. colorize = self.app.either("CELERYD_LOG_COLOR", colorize)
  112. if task_kwargs is None:
  113. task_kwargs = {}
  114. task_kwargs.setdefault("task_id", "-?-")
  115. task_name = task_kwargs.get("task_name")
  116. task_kwargs.setdefault("task_name", "-?-")
  117. logger = self._setup_logger(self.get_task_logger(loglevel, task_name),
  118. logfile, format, colorize, **kwargs)
  119. return LoggerAdapter(logger, task_kwargs)
  120. def redirect_stdouts_to_logger(self, logger, loglevel=None):
  121. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  122. logging instance.
  123. :param logger: The :class:`logging.Logger` instance to redirect to.
  124. :param loglevel: The loglevel redirected messages will be logged as.
  125. """
  126. proxy = LoggingProxy(logger, loglevel)
  127. sys.stdout = sys.stderr = proxy
  128. return proxy
  129. def _setup_logger(self, logger, logfile, format, colorize,
  130. formatter=ColorFormatter, **kwargs):
  131. if logger.handlers: # Logger already configured
  132. return logger
  133. handler = self._detect_handler(logfile)
  134. handler.setFormatter(formatter(format, use_color=colorize))
  135. logger.addHandler(handler)
  136. return logger
  137. _default_logging = Logging(app_or_default())
  138. setup_logging_subsystem = _default_logging.setup_logging_subsystem
  139. get_default_logger = _default_logging.get_default_logger
  140. setup_logger = _default_logging.setup_logger
  141. setup_task_logger = _default_logging.setup_task_logger
  142. get_task_logger = _default_logging.get_task_logger
  143. redirect_stdouts_to_logger = _default_logging.redirect_stdouts_to_logger
  144. class LoggingProxy(object):
  145. """Forward file object to :class:`logging.Logger` instance.
  146. :param logger: The :class:`logging.Logger` instance to forward to.
  147. :param loglevel: Loglevel to use when writing messages.
  148. """
  149. mode = "w"
  150. name = None
  151. closed = False
  152. loglevel = logging.ERROR
  153. _thread = threading.local()
  154. def __init__(self, logger, loglevel=None):
  155. self.logger = logger
  156. self.loglevel = loglevel or self.logger.level or self.loglevel
  157. self._safewrap_handlers()
  158. def _safewrap_handlers(self):
  159. """Make the logger handlers dump internal errors to
  160. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  161. infinite loops."""
  162. def wrap_handler(handler): # pragma: no cover
  163. class WithSafeHandleError(logging.Handler):
  164. def handleError(self, record):
  165. exc_info = sys.exc_info()
  166. try:
  167. try:
  168. traceback.print_exception(exc_info[0],
  169. exc_info[1],
  170. exc_info[2],
  171. None, sys.__stderr__)
  172. except IOError:
  173. pass # see python issue 5971
  174. finally:
  175. del(exc_info)
  176. handler.handleError = WithSafeHandleError().handleError
  177. return map(wrap_handler, self.logger.handlers)
  178. def write(self, data):
  179. if getattr(self._thread, "recurse_protection", False):
  180. # Logger is logging back to this file, so stop recursing.
  181. return
  182. """Write message to logging object."""
  183. data = data.strip()
  184. if data and not self.closed:
  185. self._thread.recurse_protection = True
  186. try:
  187. self.logger.log(self.loglevel, data)
  188. finally:
  189. self._thread.recurse_protection = False
  190. def writelines(self, sequence):
  191. """``writelines(sequence_of_strings) -> None``.
  192. Write the strings to the file.
  193. The sequence can be any iterable object producing strings.
  194. This is equivalent to calling :meth:`write` for each string.
  195. """
  196. map(self.write, sequence)
  197. def flush(self):
  198. """This object is not buffered so any :meth:`flush` requests
  199. are ignored."""
  200. pass
  201. def close(self):
  202. """When the object is closed, no write requests are forwarded to
  203. the logging object anymore."""
  204. self.closed = True
  205. def isatty(self):
  206. """Always returns ``False``. Just here for file support."""
  207. return False
  208. def fileno(self):
  209. return None
  210. class SilenceRepeated(object):
  211. """Only log action every n iterations."""
  212. def __init__(self, action, max_iterations=10):
  213. self.action = action
  214. self.max_iterations = max_iterations
  215. self._iterations = 0
  216. def __call__(self, *msgs):
  217. if self._iterations >= self.max_iterations:
  218. map(self.action, msgs)
  219. self._iterations = 0
  220. else:
  221. self._iterations += 1