log.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """celery.log"""
  2. import os
  3. import sys
  4. import time
  5. import logging
  6. import traceback
  7. from celery import conf
  8. from celery.utils import noop
  9. from celery.utils.patch import ensure_process_aware_logger
  10. _hijacked = False
  11. _monkeypatched = False
  12. def _hijack_multiprocessing_logger():
  13. from multiprocessing import util as mputil
  14. global _hijacked
  15. if _hijacked:
  16. return mputil.get_logger()
  17. ensure_process_aware_logger()
  18. logging.Logger.manager.loggerDict.clear()
  19. try:
  20. if mputil._logger is not None:
  21. mputil.logger = None
  22. except AttributeError:
  23. pass
  24. _hijacked = True
  25. return mputil.get_logger()
  26. def _detect_handler(logfile=None):
  27. """Create log handler with either a filename, an open stream
  28. or ``None`` (stderr)."""
  29. if not logfile or hasattr(logfile, "write"):
  30. return logging.StreamHandler(logfile)
  31. return logging.FileHandler(logfile)
  32. def get_default_logger(loglevel=None):
  33. """Get default logger instance.
  34. :keyword loglevel: Initial log level.
  35. """
  36. logger = _hijack_multiprocessing_logger()
  37. if loglevel is not None:
  38. logger.setLevel(loglevel)
  39. return logger
  40. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  41. format=conf.CELERYD_LOG_FORMAT, **kwargs):
  42. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  43. ``stderr`` is used.
  44. Returns logger object.
  45. """
  46. logger = get_default_logger(loglevel=loglevel)
  47. if logger.handlers: # Logger already configured
  48. return logger
  49. handler = _detect_handler(logfile)
  50. formatter = logging.Formatter(format)
  51. handler.setFormatter(formatter)
  52. logger.addHandler(handler)
  53. return logger
  54. def emergency_error(logfile, message):
  55. """Emergency error logging, for when there's no standard file
  56. descriptors open because the process has been daemonized or for
  57. some other reason."""
  58. closefh = noop
  59. logfile = logfile or sys.__stderr__
  60. if hasattr(logfile, "write"):
  61. logfh = logfile
  62. else:
  63. logfh = open(logfile, "a")
  64. closefh = logfh.close
  65. try:
  66. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  67. "asctime": time.asctime(),
  68. "pid": os.getpid(),
  69. "message": message})
  70. finally:
  71. closefh()
  72. def redirect_stdouts_to_logger(logger, loglevel=None):
  73. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  74. logging instance.
  75. :param logger: The :class:`logging.Logger` instance to redirect to.
  76. :param loglevel: The loglevel redirected messages will be logged as.
  77. """
  78. proxy = LoggingProxy(logger, loglevel)
  79. sys.stdout = sys.stderr = proxy
  80. return proxy
  81. class LoggingProxy(object):
  82. """Forward file object to :class:`logging.Logger` instance.
  83. :param logger: The :class:`logging.Logger` instance to forward to.
  84. :param loglevel: Loglevel to use when writing messages.
  85. """
  86. mode = "w"
  87. name = None
  88. closed = False
  89. loglevel = logging.ERROR
  90. def __init__(self, logger, loglevel=None):
  91. self.logger = logger
  92. self.loglevel = loglevel or self.logger.level or self.loglevel
  93. self._safewrap_handlers()
  94. def _safewrap_handlers(self):
  95. """Make the logger handlers dump internal errors to
  96. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  97. infinite loops."""
  98. def wrap_handler(handler): # pragma: no cover
  99. class WithSafeHandleError(logging.Handler):
  100. def handleError(self, record):
  101. exc_info = sys.exc_info()
  102. try:
  103. try:
  104. traceback.print_exception(exc_info[0],
  105. exc_info[1],
  106. exc_info[2],
  107. None, sys.__stderr__)
  108. except IOError:
  109. pass # see python issue 5971
  110. finally:
  111. del(exc_info)
  112. handler.handleError = WithSafeHandleError().handleError
  113. return map(wrap_handler, self.logger.handlers)
  114. def write(self, data):
  115. """Write message to logging object."""
  116. if not self.closed:
  117. self.logger.log(self.loglevel, data)
  118. def writelines(self, sequence):
  119. """``writelines(sequence_of_strings) -> None``.
  120. Write the strings to the file.
  121. The sequence can be any iterable object producing strings.
  122. This is equivalent to calling :meth:`write` for each string.
  123. """
  124. map(self.write, sequence)
  125. def flush(self):
  126. """This object is not buffered so any :meth:`flush` requests
  127. are ignored."""
  128. pass
  129. def close(self):
  130. """When the object is closed, no write requests are forwarded to
  131. the logging object anymore."""
  132. self.closed = True
  133. def isatty(self):
  134. """Always returns ``False``. Just here for file support."""
  135. return False
  136. def fileno(self):
  137. return None
  138. class SilenceRepeated(object):
  139. """Only log action every n iterations."""
  140. def __init__(self, action, max_iterations=10):
  141. self.action = action
  142. self.max_iterations = max_iterations
  143. self._iterations = 0
  144. def __call__(self, *msgs):
  145. if self._iterations >= self.max_iterations:
  146. map(self.action, msgs)
  147. self._iterations = 0
  148. else:
  149. self._iterations += 1