log.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.log
  4. ~~~~~~~~~~~~~~~~
  5. Logging utilities.
  6. """
  7. from __future__ import absolute_import, print_function, unicode_literals
  8. import logging
  9. import numbers
  10. import os
  11. import sys
  12. import threading
  13. import traceback
  14. from contextlib import contextmanager
  15. from kombu.five import values
  16. from kombu.log import get_logger as _get_logger, LOG_LEVELS
  17. from kombu.utils.encoding import safe_str
  18. from celery.five import string_t, text_t
  19. from .term import colored
  20. __all__ = ['ColorFormatter', 'LoggingProxy', 'base_logger',
  21. 'set_in_sighandler', 'in_sighandler', 'get_logger',
  22. 'get_task_logger', 'mlevel',
  23. 'get_multiprocessing_logger', 'reset_multiprocessing_logger']
  24. _process_aware = False
  25. PY3 = sys.version_info[0] == 3
  26. MP_LOG = os.environ.get('MP_LOG', False)
  27. # Sets up our logging hierarchy.
  28. #
  29. # Every logger in the celery package inherits from the "celery"
  30. # logger, and every task logger inherits from the "celery.task"
  31. # logger.
  32. base_logger = logger = _get_logger('celery')
  33. _in_sighandler = False
  34. def set_in_sighandler(value):
  35. global _in_sighandler
  36. _in_sighandler = value
  37. def iter_open_logger_fds():
  38. seen = set()
  39. loggers = (list(values(logging.Logger.manager.loggerDict)) +
  40. [logging.getLogger(None)])
  41. for logger in loggers:
  42. try:
  43. for handler in logger.handlers:
  44. try:
  45. if handler not in seen: # pragma: no cover
  46. yield handler.stream
  47. seen.add(handler)
  48. except AttributeError:
  49. pass
  50. except AttributeError: # PlaceHolder does not have handlers
  51. pass
  52. @contextmanager
  53. def in_sighandler():
  54. set_in_sighandler(True)
  55. try:
  56. yield
  57. finally:
  58. set_in_sighandler(False)
  59. def logger_isa(l, p, max=1000):
  60. this, seen = l, set()
  61. for _ in range(max):
  62. if this == p:
  63. return True
  64. else:
  65. if this in seen:
  66. raise RuntimeError(
  67. 'Logger {0!r} parents recursive'.format(l),
  68. )
  69. seen.add(this)
  70. this = this.parent
  71. if not this:
  72. break
  73. else: # pragma: no cover
  74. raise RuntimeError('Logger hierarchy exceeds {0}'.format(max))
  75. return False
  76. def get_logger(name):
  77. l = _get_logger(name)
  78. if logging.root not in (l, l.parent) and l is not base_logger:
  79. if not logger_isa(l, base_logger): # pragma: no cover
  80. l.parent = base_logger
  81. return l
  82. task_logger = get_logger('celery.task')
  83. worker_logger = get_logger('celery.worker')
  84. def get_task_logger(name):
  85. logger = get_logger(name)
  86. if not logger_isa(logger, task_logger):
  87. logger.parent = task_logger
  88. return logger
  89. def mlevel(level):
  90. if level and not isinstance(level, numbers.Integral):
  91. return LOG_LEVELS[level.upper()]
  92. return level
  93. class ColorFormatter(logging.Formatter):
  94. #: Loglevel -> Color mapping.
  95. COLORS = colored().names
  96. colors = {'DEBUG': COLORS['blue'], 'WARNING': COLORS['yellow'],
  97. 'ERROR': COLORS['red'], 'CRITICAL': COLORS['magenta']}
  98. def __init__(self, fmt=None, use_color=True):
  99. logging.Formatter.__init__(self, fmt)
  100. self.use_color = use_color
  101. def formatException(self, ei):
  102. if ei and not isinstance(ei, tuple):
  103. ei = sys.exc_info()
  104. r = logging.Formatter.formatException(self, ei)
  105. if isinstance(r, str) and not PY3:
  106. return safe_str(r)
  107. return r
  108. def format(self, record):
  109. msg = logging.Formatter.format(self, record)
  110. color = self.colors.get(record.levelname)
  111. # reset exception info later for other handlers...
  112. einfo = sys.exc_info() if record.exc_info == 1 else record.exc_info
  113. if color and self.use_color:
  114. try:
  115. # safe_str will repr the color object
  116. # and color will break on non-string objects
  117. # so need to reorder calls based on type.
  118. # Issue #427
  119. try:
  120. if isinstance(msg, string_t):
  121. return text_t(color(safe_str(msg)))
  122. return safe_str(color(msg))
  123. except UnicodeDecodeError: # pragma: no cover
  124. return safe_str(msg) # skip colors
  125. except Exception as exc:
  126. prev_msg, record.exc_info, record.msg = (
  127. record.msg, 1, '<Unrepresentable {0!r}: {1!r}>'.format(
  128. type(msg), exc
  129. ),
  130. )
  131. try:
  132. return logging.Formatter.format(self, record)
  133. finally:
  134. record.msg, record.exc_info = prev_msg, einfo
  135. else:
  136. return safe_str(msg)
  137. class LoggingProxy(object):
  138. """Forward file object to :class:`logging.Logger` instance.
  139. :param logger: The :class:`logging.Logger` instance to forward to.
  140. :param loglevel: Loglevel to use when writing messages.
  141. """
  142. mode = 'w'
  143. name = None
  144. closed = False
  145. loglevel = logging.ERROR
  146. _thread = threading.local()
  147. def __init__(self, logger, loglevel=None):
  148. self.logger = logger
  149. self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
  150. self._safewrap_handlers()
  151. def _safewrap_handlers(self):
  152. """Make the logger handlers dump internal errors to
  153. `sys.__stderr__` instead of `sys.stderr` to circumvent
  154. infinite loops."""
  155. def wrap_handler(handler): # pragma: no cover
  156. class WithSafeHandleError(logging.Handler):
  157. def handleError(self, record):
  158. exc_info = sys.exc_info()
  159. try:
  160. try:
  161. traceback.print_exception(exc_info[0],
  162. exc_info[1],
  163. exc_info[2],
  164. None, sys.__stderr__)
  165. except IOError:
  166. pass # see python issue 5971
  167. finally:
  168. del(exc_info)
  169. handler.handleError = WithSafeHandleError().handleError
  170. return [wrap_handler(h) for h in self.logger.handlers]
  171. def write(self, data):
  172. """Write message to logging object."""
  173. if _in_sighandler:
  174. return print(safe_str(data), file=sys.__stderr__)
  175. if getattr(self._thread, 'recurse_protection', False):
  176. # Logger is logging back to this file, so stop recursing.
  177. return
  178. data = data.strip()
  179. if data and not self.closed:
  180. self._thread.recurse_protection = True
  181. try:
  182. self.logger.log(self.loglevel, safe_str(data))
  183. finally:
  184. self._thread.recurse_protection = False
  185. def writelines(self, sequence):
  186. """`writelines(sequence_of_strings) -> None`.
  187. Write the strings to the file.
  188. The sequence can be any iterable object producing strings.
  189. This is equivalent to calling :meth:`write` for each string.
  190. """
  191. for part in sequence:
  192. self.write(part)
  193. def flush(self):
  194. """This object is not buffered so any :meth:`flush` requests
  195. are ignored."""
  196. pass
  197. def close(self):
  198. """When the object is closed, no write requests are forwarded to
  199. the logging object anymore."""
  200. self.closed = True
  201. def isatty(self):
  202. """Always return :const:`False`. Just here for file support."""
  203. return False
  204. def get_multiprocessing_logger():
  205. try:
  206. from billiard import util
  207. except ImportError: # pragma: no cover
  208. pass
  209. else:
  210. return util.get_logger()
  211. def reset_multiprocessing_logger():
  212. try:
  213. from billiard import util
  214. except ImportError: # pragma: no cover
  215. pass
  216. else:
  217. if hasattr(util, '_logger'): # pragma: no cover
  218. util._logger = None
  219. def current_process():
  220. try:
  221. from billiard import process
  222. except ImportError: # pragma: no cover
  223. pass
  224. else:
  225. return process.current_process()
  226. def current_process_index(base=1):
  227. index = getattr(current_process(), 'index', None)
  228. return index + base if index is not None else index