log.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. """celery.log"""
  2. import os
  3. import sys
  4. import time
  5. import logging
  6. import traceback
  7. from celery import conf
  8. from celery.utils import noop
  9. from celery.utils.patch import ensure_process_aware_logger
  10. from celery.utils.compat import LoggerAdapter
  11. _hijacked = False
  12. _monkeypatched = False
  13. def get_task_logger(loglevel=None):
  14. ensure_process_aware_logger()
  15. logger = logging.getLogger("celery.Task")
  16. if loglevel is not None:
  17. logger.setLevel(loglevel)
  18. return logger
  19. def _hijack_multiprocessing_logger():
  20. from multiprocessing import util as mputil
  21. global _hijacked
  22. if _hijacked:
  23. return mputil.get_logger()
  24. ensure_process_aware_logger()
  25. logging.Logger.manager.loggerDict.clear()
  26. try:
  27. if mputil._logger is not None:
  28. mputil.logger = None
  29. except AttributeError:
  30. pass
  31. _hijacked = True
  32. return mputil.get_logger()
  33. def _detect_handler(logfile=None):
  34. """Create log handler with either a filename, an open stream
  35. or ``None`` (stderr)."""
  36. if not logfile or hasattr(logfile, "write"):
  37. return logging.StreamHandler(logfile)
  38. return logging.FileHandler(logfile)
  39. def get_default_logger(loglevel=None):
  40. """Get default logger instance.
  41. :keyword loglevel: Initial log level.
  42. """
  43. logger = _hijack_multiprocessing_logger()
  44. if loglevel is not None:
  45. logger.setLevel(loglevel)
  46. return logger
  47. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  48. format=conf.CELERYD_LOG_FORMAT, **kwargs):
  49. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  50. then ``stderr`` is used.
  51. Returns logger object.
  52. """
  53. return _setup_logger(get_default_logger(loglevel),
  54. logfile, format, **kwargs)
  55. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  56. format=conf.CELERYD_TASK_LOG_FORMAT, task_kwargs=None, **kwargs):
  57. """Setup the task logger. If ``logfile`` is not specified, then
  58. ``stderr`` is used.
  59. Returns logger object.
  60. """
  61. if task_kwargs is None:
  62. task_kwargs = {}
  63. task_kwargs.setdefault("task_id", "-?-")
  64. task_kwargs.setdefault("task_name", "-?-")
  65. logger = _setup_logger(get_task_logger(loglevel),
  66. logfile, format, **kwargs)
  67. return LoggerAdapter(logger, task_kwargs)
  68. def _setup_logger(logger, logfile, format,
  69. formatter=logging.Formatter, **kwargs):
  70. if logger.handlers: # Logger already configured
  71. return logger
  72. handler = _detect_handler(logfile)
  73. handler.setFormatter(formatter(format))
  74. logger.addHandler(handler)
  75. return logger
  76. def emergency_error(logfile, message):
  77. """Emergency error logging, for when there's no standard file
  78. descriptors open because the process has been daemonized or for
  79. some other reason."""
  80. closefh = noop
  81. logfile = logfile or sys.__stderr__
  82. if hasattr(logfile, "write"):
  83. logfh = logfile
  84. else:
  85. logfh = open(logfile, "a")
  86. closefh = logfh.close
  87. try:
  88. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  89. "asctime": time.asctime(),
  90. "pid": os.getpid(),
  91. "message": message})
  92. finally:
  93. closefh()
  94. def redirect_stdouts_to_logger(logger, loglevel=None):
  95. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  96. logging instance.
  97. :param logger: The :class:`logging.Logger` instance to redirect to.
  98. :param loglevel: The loglevel redirected messages will be logged as.
  99. """
  100. proxy = LoggingProxy(logger, loglevel)
  101. sys.stdout = sys.stderr = proxy
  102. return proxy
  103. class LoggingProxy(object):
  104. """Forward file object to :class:`logging.Logger` instance.
  105. :param logger: The :class:`logging.Logger` instance to forward to.
  106. :param loglevel: Loglevel to use when writing messages.
  107. """
  108. mode = "w"
  109. name = None
  110. closed = False
  111. loglevel = logging.ERROR
  112. def __init__(self, logger, loglevel=None):
  113. self.logger = logger
  114. self.loglevel = loglevel or self.logger.level or self.loglevel
  115. self._safewrap_handlers()
  116. def _safewrap_handlers(self):
  117. """Make the logger handlers dump internal errors to
  118. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  119. infinite loops."""
  120. def wrap_handler(handler): # pragma: no cover
  121. class WithSafeHandleError(logging.Handler):
  122. def handleError(self, record):
  123. exc_info = sys.exc_info()
  124. try:
  125. try:
  126. traceback.print_exception(exc_info[0],
  127. exc_info[1],
  128. exc_info[2],
  129. None, sys.__stderr__)
  130. except IOError:
  131. pass # see python issue 5971
  132. finally:
  133. del(exc_info)
  134. handler.handleError = WithSafeHandleError().handleError
  135. return map(wrap_handler, self.logger.handlers)
  136. def write(self, data):
  137. """Write message to logging object."""
  138. if not self.closed:
  139. self.logger.log(self.loglevel, data)
  140. def writelines(self, sequence):
  141. """``writelines(sequence_of_strings) -> None``.
  142. Write the strings to the file.
  143. The sequence can be any iterable object producing strings.
  144. This is equivalent to calling :meth:`write` for each string.
  145. """
  146. map(self.write, sequence)
  147. def flush(self):
  148. """This object is not buffered so any :meth:`flush` requests
  149. are ignored."""
  150. pass
  151. def close(self):
  152. """When the object is closed, no write requests are forwarded to
  153. the logging object anymore."""
  154. self.closed = True
  155. def isatty(self):
  156. """Always returns ``False``. Just here for file support."""
  157. return False
  158. def fileno(self):
  159. return None
  160. class SilenceRepeated(object):
  161. """Only log action every n iterations."""
  162. def __init__(self, action, max_iterations=10):
  163. self.action = action
  164. self.max_iterations = max_iterations
  165. self._iterations = 0
  166. def __call__(self, *msgs):
  167. if self._iterations >= self.max_iterations:
  168. map(self.action, msgs)
  169. self._iterations = 0
  170. else:
  171. self._iterations += 1