log.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # -*- coding: utf-8 -*-
  2. """Logging utilities."""
  3. from __future__ import absolute_import, print_function, unicode_literals
  4. import logging
  5. import numbers
  6. import os
  7. import sys
  8. import threading
  9. import traceback
  10. from contextlib import contextmanager
  11. from kombu.five import values
  12. from kombu.log import get_logger as _get_logger, LOG_LEVELS
  13. from kombu.utils.encoding import safe_str
  14. from celery.five import string_t, text_t
  15. from .term import colored
  16. __all__ = [
  17. 'ColorFormatter', 'LoggingProxy', 'base_logger',
  18. 'set_in_sighandler', 'in_sighandler', 'get_logger',
  19. 'get_task_logger', 'mlevel',
  20. 'get_multiprocessing_logger', 'reset_multiprocessing_logger',
  21. ]
  22. _process_aware = False
  23. _in_sighandler = False
  24. PY3 = sys.version_info[0] == 3
  25. MP_LOG = os.environ.get('MP_LOG', False)
  26. # Sets up our logging hierarchy.
  27. #
  28. # Every logger in the celery package inherits from the "celery"
  29. # logger, and every task logger inherits from the "celery.task"
  30. # logger.
  31. base_logger = logger = _get_logger('celery')
  32. def set_in_sighandler(value):
  33. global _in_sighandler
  34. _in_sighandler = value
  35. def iter_open_logger_fds():
  36. seen = set()
  37. loggers = (list(values(logging.Logger.manager.loggerDict)) +
  38. [logging.getLogger(None)])
  39. for logger in loggers:
  40. try:
  41. for handler in logger.handlers:
  42. try:
  43. if handler not in seen: # pragma: no cover
  44. yield handler.stream
  45. seen.add(handler)
  46. except AttributeError:
  47. pass
  48. except AttributeError: # PlaceHolder does not have handlers
  49. pass
  50. @contextmanager
  51. def in_sighandler():
  52. set_in_sighandler(True)
  53. try:
  54. yield
  55. finally:
  56. set_in_sighandler(False)
  57. def logger_isa(l, p, max=1000):
  58. this, seen = l, set()
  59. for _ in range(max):
  60. if this == p:
  61. return True
  62. else:
  63. if this in seen:
  64. raise RuntimeError(
  65. 'Logger {0!r} parents recursive'.format(l),
  66. )
  67. seen.add(this)
  68. this = this.parent
  69. if not this:
  70. break
  71. else: # pragma: no cover
  72. raise RuntimeError('Logger hierarchy exceeds {0}'.format(max))
  73. return False
  74. def get_logger(name):
  75. l = _get_logger(name)
  76. if logging.root not in (l, l.parent) and l is not base_logger:
  77. if not logger_isa(l, base_logger): # pragma: no cover
  78. l.parent = base_logger
  79. return l
  80. task_logger = get_logger('celery.task')
  81. worker_logger = get_logger('celery.worker')
  82. def get_task_logger(name):
  83. logger = get_logger(name)
  84. if not logger_isa(logger, task_logger):
  85. logger.parent = task_logger
  86. return logger
  87. def mlevel(level):
  88. if level and not isinstance(level, numbers.Integral):
  89. return LOG_LEVELS[level.upper()]
  90. return level
  91. class ColorFormatter(logging.Formatter):
  92. #: Loglevel -> Color mapping.
  93. COLORS = colored().names
  94. colors = {
  95. 'DEBUG': COLORS['blue'],
  96. 'WARNING': COLORS['yellow'],
  97. 'ERROR': COLORS['red'],
  98. 'CRITICAL': COLORS['magenta'],
  99. }
  100. def __init__(self, fmt=None, use_color=True):
  101. logging.Formatter.__init__(self, fmt)
  102. self.use_color = use_color
  103. def formatException(self, ei):
  104. if ei and not isinstance(ei, tuple):
  105. ei = sys.exc_info()
  106. r = logging.Formatter.formatException(self, ei)
  107. if isinstance(r, str) and not PY3:
  108. return safe_str(r)
  109. return r
  110. def format(self, record):
  111. msg = logging.Formatter.format(self, record)
  112. color = self.colors.get(record.levelname)
  113. # reset exception info later for other handlers...
  114. einfo = sys.exc_info() if record.exc_info == 1 else record.exc_info
  115. if color and self.use_color:
  116. try:
  117. # safe_str will repr the color object
  118. # and color will break on non-string objects
  119. # so need to reorder calls based on type.
  120. # Issue #427
  121. try:
  122. if isinstance(msg, string_t):
  123. return text_t(color(safe_str(msg)))
  124. return safe_str(color(msg))
  125. except UnicodeDecodeError: # pragma: no cover
  126. return safe_str(msg) # skip colors
  127. except Exception as exc:
  128. prev_msg, record.exc_info, record.msg = (
  129. record.msg, 1, '<Unrepresentable {0!r}: {1!r}>'.format(
  130. type(msg), exc
  131. ),
  132. )
  133. try:
  134. return logging.Formatter.format(self, record)
  135. finally:
  136. record.msg, record.exc_info = prev_msg, einfo
  137. else:
  138. return safe_str(msg)
  139. class LoggingProxy(object):
  140. """Forward file object to :class:`logging.Logger` instance.
  141. Arguments:
  142. logger (~logging.Logger): Logger instance to forward to.
  143. loglevel (int, str): Log level to use when logging messages.
  144. """
  145. mode = 'w'
  146. name = None
  147. closed = False
  148. loglevel = logging.ERROR
  149. _thread = threading.local()
  150. def __init__(self, logger, loglevel=None):
  151. self.logger = logger
  152. self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
  153. self._safewrap_handlers()
  154. def _safewrap_handlers(self):
  155. """Make the logger handlers dump internal errors to
  156. :data:`sys.__stderr__` instead of :data:`sys.stderr` to circumvent
  157. infinite loops."""
  158. def wrap_handler(handler): # pragma: no cover
  159. class WithSafeHandleError(logging.Handler):
  160. def handleError(self, record):
  161. try:
  162. traceback.print_exc(None, sys.__stderr__)
  163. except IOError:
  164. pass # see python issue 5971
  165. handler.handleError = WithSafeHandleError().handleError
  166. return [wrap_handler(h) for h in self.logger.handlers]
  167. def write(self, data):
  168. """Write message to logging object."""
  169. if _in_sighandler:
  170. return print(safe_str(data), file=sys.__stderr__)
  171. if getattr(self._thread, 'recurse_protection', False):
  172. # Logger is logging back to this file, so stop recursing.
  173. return
  174. data = data.strip()
  175. if data and not self.closed:
  176. self._thread.recurse_protection = True
  177. try:
  178. self.logger.log(self.loglevel, safe_str(data))
  179. finally:
  180. self._thread.recurse_protection = False
  181. def writelines(self, sequence):
  182. """`writelines(sequence_of_strings) -> None`.
  183. Write the strings to the file.
  184. The sequence can be any iterable object producing strings.
  185. This is equivalent to calling :meth:`write` for each string.
  186. """
  187. for part in sequence:
  188. self.write(part)
  189. def flush(self):
  190. """This object is not buffered so any :meth:`flush` requests
  191. are ignored."""
  192. pass
  193. def close(self):
  194. """When the object is closed, no write requests are forwarded to
  195. the logging object anymore."""
  196. self.closed = True
  197. def isatty(self):
  198. """Always return :const:`False`. Just here for file support."""
  199. return False
  200. def get_multiprocessing_logger():
  201. try:
  202. from billiard import util
  203. except ImportError: # pragma: no cover
  204. pass
  205. else:
  206. return util.get_logger()
  207. def reset_multiprocessing_logger():
  208. try:
  209. from billiard import util
  210. except ImportError: # pragma: no cover
  211. pass
  212. else:
  213. if hasattr(util, '_logger'): # pragma: no cover
  214. util._logger = None
  215. def current_process():
  216. try:
  217. from billiard import process
  218. except ImportError: # pragma: no cover
  219. pass
  220. else:
  221. return process.current_process()
  222. def current_process_index(base=1):
  223. index = getattr(current_process(), 'index', None)
  224. return index + base if index is not None else index