log.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """celery.log"""
  2. import os
  3. import sys
  4. import time
  5. import logging
  6. import traceback
  7. from celery import conf
  8. from celery.utils import noop
  9. _hijacked = False
  10. _monkeypatched = False
  11. def _ensure_process_aware_logger():
  12. global _monkeypatched
  13. if not _monkeypatched:
  14. from celery.utils.patch import monkeypatch
  15. monkeypatch()
  16. _monkeypatched = True
  17. def _hijack_multiprocessing_logger():
  18. from multiprocessing import util as mputil
  19. global _hijacked
  20. if _hijacked:
  21. return mputil.get_logger()
  22. _ensure_process_aware_logger()
  23. logging.Logger.manager.loggerDict.clear()
  24. try:
  25. if mputil._logger is not None:
  26. mputil.logger = None
  27. except AttributeError:
  28. pass
  29. _hijacked = True
  30. return mputil.get_logger()
  31. def get_default_logger(loglevel=None):
  32. """Get default logger instance.
  33. :keyword loglevel: Initial log level.
  34. """
  35. logger = _hijack_multiprocessing_logger()
  36. if loglevel is not None:
  37. logger.setLevel(loglevel)
  38. return logger
  39. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  40. format=conf.CELERYD_LOG_FORMAT, **kwargs):
  41. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  42. ``stderr`` is used.
  43. Returns logger object.
  44. """
  45. logger = get_default_logger(loglevel=loglevel)
  46. if logger.handlers:
  47. # Logger already configured
  48. return logger
  49. if logfile:
  50. handler = logging.FileHandler
  51. if hasattr(logfile, "write"):
  52. handler = logging.StreamHandler
  53. loghandler = handler(logfile)
  54. formatter = logging.Formatter(format)
  55. loghandler.setFormatter(formatter)
  56. logger.addHandler(loghandler)
  57. else:
  58. from multiprocessing.util import log_to_stderr
  59. log_to_stderr()
  60. return logger
  61. def emergency_error(logfile, message):
  62. """Emergency error logging, for when there's no standard file
  63. descriptors open because the process has been daemonized or for
  64. some other reason."""
  65. closefh = noop
  66. logfile = logfile or sys.__stderr__
  67. if hasattr(logfile, "write"):
  68. logfh = logfile
  69. else:
  70. logfh = open(logfile, "a")
  71. closefh = logfh.close
  72. try:
  73. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  74. "asctime": time.asctime(),
  75. "pid": os.getpid(),
  76. "message": message})
  77. finally:
  78. closefh()
  79. def redirect_stdouts_to_logger(logger, loglevel=None):
  80. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  81. logging instance.
  82. :param logger: The :class:`logging.Logger` instance to redirect to.
  83. :param loglevel: The loglevel redirected messages will be logged as.
  84. """
  85. proxy = LoggingProxy(logger, loglevel)
  86. sys.stdout = sys.stderr = proxy
  87. return proxy
  88. class LoggingProxy(object):
  89. """Forward file object to :class:`logging.Logger` instance.
  90. :param logger: The :class:`logging.Logger` instance to forward to.
  91. :param loglevel: Loglevel to use when writing messages.
  92. """
  93. mode = "w"
  94. name = None
  95. closed = False
  96. loglevel = logging.ERROR
  97. def __init__(self, logger, loglevel=None):
  98. self.logger = logger
  99. self.loglevel = loglevel or self.logger.level or self.loglevel
  100. self._safewrap_handlers()
  101. def _safewrap_handlers(self):
  102. """Make the logger handlers dump internal errors to
  103. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  104. infinite loops."""
  105. def wrap_handler(handler): # pragma: no cover
  106. class WithSafeHandleError(logging.Handler):
  107. def handleError(self, record):
  108. exc_info = sys.exc_info()
  109. try:
  110. traceback.print_exception(exc_info[0], exc_info[1],
  111. exc_info[2], None,
  112. sys.__stderr__)
  113. except IOError:
  114. pass # see python issue 5971
  115. finally:
  116. del(exc_info)
  117. handler.handleError = WithSafeHandleError().handleError
  118. return map(wrap_handler, self.logger.handlers)
  119. def write(self, data):
  120. """Write message to logging object."""
  121. if not self.closed:
  122. self.logger.log(self.loglevel, data)
  123. def writelines(self, sequence):
  124. """``writelines(sequence_of_strings) -> None``.
  125. Write the strings to the file.
  126. The sequence can be any iterable object producing strings.
  127. This is equivalent to calling :meth:`write` for each string.
  128. """
  129. map(self.write, sequence)
  130. def flush(self):
  131. """This object is not buffered so any :meth:`flush` requests
  132. are ignored."""
  133. pass
  134. def close(self):
  135. """When the object is closed, no write requests are forwarded to
  136. the logging object anymore."""
  137. self.closed = True
  138. def isatty(self):
  139. """Always returns ``False``. Just here for file support."""
  140. return False
  141. def fileno(self):
  142. return None
  143. class SilenceRepeated(object):
  144. """Only log action every n iterations."""
  145. def __init__(self, action, max_iterations=10):
  146. self.action = action
  147. self.max_iterations = max_iterations
  148. self._iterations = 0
  149. def __call__(self, *msgs):
  150. if self._iterations >= self.max_iterations:
  151. map(self.action, msgs)
  152. self._iterations = 0
  153. else:
  154. self._iterations += 1