log.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.log
  4. ~~~~~~~~~~~~~~~~
  5. Logging utilities.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import logging
  9. import numbers
  10. import os
  11. import sys
  12. import threading
  13. import traceback
  14. from contextlib import contextmanager
  15. from kombu.five import values
  16. from kombu.log import get_logger as _get_logger, LOG_LEVELS
  17. from kombu.utils.encoding import safe_str
  18. from celery.five import string_t, text_t
  19. from .term import colored
  20. __all__ = ['ColorFormatter', 'LoggingProxy', 'base_logger',
  21. 'set_in_sighandler', 'in_sighandler', 'get_logger',
  22. 'get_task_logger', 'mlevel',
  23. 'get_multiprocessing_logger', 'reset_multiprocessing_logger']
  24. _process_aware = False
  25. PY3 = sys.version_info[0] == 3
  26. MP_LOG = os.environ.get('MP_LOG', False)
  27. # Sets up our logging hierarchy.
  28. #
  29. # Every logger in the celery package inherits from the "celery"
  30. # logger, and every task logger inherits from the "celery.task"
  31. # logger.
  32. base_logger = logger = _get_logger('celery')
  33. mp_logger = _get_logger('multiprocessing')
  34. _in_sighandler = False
  35. def set_in_sighandler(value):
  36. global _in_sighandler
  37. _in_sighandler = value
  38. def iter_open_logger_fds():
  39. seen = set()
  40. loggers = (list(values(logging.Logger.manager.loggerDict)) +
  41. [logging.getLogger(None)])
  42. for logger in loggers:
  43. try:
  44. for handler in logger.handlers:
  45. try:
  46. if handler not in seen: # pragma: no cover
  47. yield handler.stream
  48. seen.add(handler)
  49. except AttributeError:
  50. pass
  51. except AttributeError: # PlaceHolder does not have handlers
  52. pass
  53. @contextmanager
  54. def in_sighandler():
  55. set_in_sighandler(True)
  56. try:
  57. yield
  58. finally:
  59. set_in_sighandler(False)
  60. def logger_isa(l, p, max=1000):
  61. this, seen = l, set()
  62. for _ in range(max):
  63. if this == p:
  64. return True
  65. else:
  66. if this in seen:
  67. raise RuntimeError(
  68. 'Logger {0!r} parents recursive'.format(l),
  69. )
  70. seen.add(this)
  71. this = this.parent
  72. if not this:
  73. break
  74. else: # pragma: no cover
  75. raise RuntimeError('Logger hierarchy exceeds {0}'.format(max))
  76. return False
  77. def get_logger(name):
  78. l = _get_logger(name)
  79. if logging.root not in (l, l.parent) and l is not base_logger:
  80. if not logger_isa(l, base_logger): # pragma: no cover
  81. l.parent = base_logger
  82. return l
  83. task_logger = get_logger('celery.task')
  84. worker_logger = get_logger('celery.worker')
  85. def get_task_logger(name):
  86. logger = get_logger(name)
  87. if not logger_isa(logger, task_logger):
  88. logger.parent = task_logger
  89. return logger
  90. def mlevel(level):
  91. if level and not isinstance(level, numbers.Integral):
  92. return LOG_LEVELS[level.upper()]
  93. return level
  94. class ColorFormatter(logging.Formatter):
  95. #: Loglevel -> Color mapping.
  96. COLORS = colored().names
  97. colors = {'DEBUG': COLORS['blue'], 'WARNING': COLORS['yellow'],
  98. 'ERROR': COLORS['red'], 'CRITICAL': COLORS['magenta']}
  99. def __init__(self, fmt=None, use_color=True):
  100. logging.Formatter.__init__(self, fmt)
  101. self.use_color = use_color
  102. def formatException(self, ei):
  103. if ei and not isinstance(ei, tuple):
  104. ei = sys.exc_info()
  105. r = logging.Formatter.formatException(self, ei)
  106. if isinstance(r, str) and not PY3:
  107. return safe_str(r)
  108. return r
  109. def format(self, record):
  110. msg = logging.Formatter.format(self, record)
  111. color = self.colors.get(record.levelname)
  112. # reset exception info later for other handlers...
  113. einfo = sys.exc_info() if record.exc_info == 1 else record.exc_info
  114. if color and self.use_color:
  115. try:
  116. # safe_str will repr the color object
  117. # and color will break on non-string objects
  118. # so need to reorder calls based on type.
  119. # Issue #427
  120. try:
  121. if isinstance(msg, string_t):
  122. return text_t(color(safe_str(msg)))
  123. return safe_str(color(msg))
  124. except UnicodeDecodeError: # pragma: no cover
  125. return safe_str(msg) # skip colors
  126. except Exception as exc:
  127. prev_msg, record.exc_info, record.msg = (
  128. record.msg, 1, '<Unrepresentable {0!r}: {1!r}>'.format(
  129. type(msg), exc
  130. ),
  131. )
  132. try:
  133. return logging.Formatter.format(self, record)
  134. finally:
  135. record.msg, record.exc_info = prev_msg, einfo
  136. else:
  137. return safe_str(msg)
  138. class LoggingProxy(object):
  139. """Forward file object to :class:`logging.Logger` instance.
  140. :param logger: The :class:`logging.Logger` instance to forward to.
  141. :param loglevel: Loglevel to use when writing messages.
  142. """
  143. mode = 'w'
  144. name = None
  145. closed = False
  146. loglevel = logging.ERROR
  147. _thread = threading.local()
  148. def __init__(self, logger, loglevel=None):
  149. self.logger = logger
  150. self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
  151. self._safewrap_handlers()
  152. def _safewrap_handlers(self):
  153. """Make the logger handlers dump internal errors to
  154. `sys.__stderr__` instead of `sys.stderr` to circumvent
  155. infinite loops."""
  156. def wrap_handler(handler): # pragma: no cover
  157. class WithSafeHandleError(logging.Handler):
  158. def handleError(self, record):
  159. exc_info = sys.exc_info()
  160. try:
  161. try:
  162. traceback.print_exception(exc_info[0],
  163. exc_info[1],
  164. exc_info[2],
  165. None, sys.__stderr__)
  166. except IOError:
  167. pass # see python issue 5971
  168. finally:
  169. del(exc_info)
  170. handler.handleError = WithSafeHandleError().handleError
  171. return [wrap_handler(h) for h in self.logger.handlers]
  172. def write(self, data):
  173. """Write message to logging object."""
  174. if _in_sighandler:
  175. return print(safe_str(data), file=sys.__stderr__)
  176. if getattr(self._thread, 'recurse_protection', False):
  177. # Logger is logging back to this file, so stop recursing.
  178. return
  179. data = data.strip()
  180. if data and not self.closed:
  181. self._thread.recurse_protection = True
  182. try:
  183. self.logger.log(self.loglevel, safe_str(data))
  184. finally:
  185. self._thread.recurse_protection = False
  186. def writelines(self, sequence):
  187. """`writelines(sequence_of_strings) -> None`.
  188. Write the strings to the file.
  189. The sequence can be any iterable object producing strings.
  190. This is equivalent to calling :meth:`write` for each string.
  191. """
  192. for part in sequence:
  193. self.write(part)
  194. def flush(self):
  195. """This object is not buffered so any :meth:`flush` requests
  196. are ignored."""
  197. pass
  198. def close(self):
  199. """When the object is closed, no write requests are forwarded to
  200. the logging object anymore."""
  201. self.closed = True
  202. def isatty(self):
  203. """Always return :const:`False`. Just here for file support."""
  204. return False
  205. def get_multiprocessing_logger():
  206. try:
  207. from billiard import util
  208. except ImportError: # pragma: no cover
  209. pass
  210. else:
  211. return util.get_logger()
  212. def reset_multiprocessing_logger():
  213. try:
  214. from billiard import util
  215. except ImportError: # pragma: no cover
  216. pass
  217. else:
  218. if hasattr(util, '_logger'): # pragma: no cover
  219. util._logger = None
  220. def current_process():
  221. try:
  222. from billiard import process
  223. except ImportError: # pragma: no cover
  224. pass
  225. else:
  226. return process.current_process()
  227. def current_process_index(base=1):
  228. index = getattr(current_process(), 'index', None)
  229. return index + base if index is not None else index