log.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.utils.log
  4. ~~~~~~~~~~~~~~~~
  5. Logging utilities.
  6. """
  7. from __future__ import absolute_import, print_function
  8. import logging
  9. import numbers
  10. import os
  11. import sys
  12. import threading
  13. import traceback
  14. from contextlib import contextmanager
  15. from billiard import current_process, util as mputil
  16. from kombu.five import values
  17. from kombu.log import get_logger as _get_logger, LOG_LEVELS
  18. from kombu.utils.encoding import safe_str
  19. from celery.five import string_t, text_t
  20. from .term import colored
  21. __all__ = ['ColorFormatter', 'LoggingProxy', 'base_logger',
  22. 'set_in_sighandler', 'in_sighandler', 'get_logger',
  23. 'get_task_logger', 'mlevel', 'ensure_process_aware_logger',
  24. 'get_multiprocessing_logger', 'reset_multiprocessing_logger']
  25. _process_aware = False
  26. PY3 = sys.version_info[0] == 3
  27. MP_LOG = os.environ.get('MP_LOG', False)
  28. # Sets up our logging hierarchy.
  29. #
  30. # Every logger in the celery package inherits from the "celery"
  31. # logger, and every task logger inherits from the "celery.task"
  32. # logger.
  33. base_logger = logger = _get_logger('celery')
  34. mp_logger = _get_logger('multiprocessing')
  35. _in_sighandler = False
  36. def set_in_sighandler(value):
  37. global _in_sighandler
  38. _in_sighandler = value
  39. def iter_open_logger_fds():
  40. seen = set()
  41. loggers = (list(values(logging.Logger.manager.loggerDict)) +
  42. [logging.getLogger(None)])
  43. for logger in loggers:
  44. try:
  45. for handler in logger.handlers:
  46. try:
  47. if handler not in seen:
  48. yield handler.stream
  49. seen.add(handler)
  50. except AttributeError:
  51. pass
  52. except AttributeError: # PlaceHolder does not have handlers
  53. pass
  54. @contextmanager
  55. def in_sighandler():
  56. set_in_sighandler(True)
  57. try:
  58. yield
  59. finally:
  60. set_in_sighandler(False)
  61. def logger_isa(l, p, max=1000):
  62. this, seen = l, set()
  63. for _ in range(max):
  64. if this == p:
  65. return True
  66. else:
  67. if this in seen:
  68. raise RuntimeError(
  69. 'Logger {0!r} parents recursive'.format(l),
  70. )
  71. seen.add(this)
  72. this = this.parent
  73. if not this:
  74. break
  75. else:
  76. raise RuntimeError('Logger hierarchy exceeds {0}'.format(max))
  77. return False
  78. def get_logger(name):
  79. l = _get_logger(name)
  80. if logging.root not in (l, l.parent) and l is not base_logger:
  81. if not logger_isa(l, base_logger):
  82. l.parent = base_logger
  83. return l
  84. task_logger = get_logger('celery.task')
  85. worker_logger = get_logger('celery.worker')
  86. def get_task_logger(name):
  87. logger = get_logger(name)
  88. if not logger_isa(logger, task_logger):
  89. logger.parent = task_logger
  90. return logger
  91. def mlevel(level):
  92. if level and not isinstance(level, numbers.Integral):
  93. return LOG_LEVELS[level.upper()]
  94. return level
  95. class ColorFormatter(logging.Formatter):
  96. #: Loglevel -> Color mapping.
  97. COLORS = colored().names
  98. colors = {'DEBUG': COLORS['blue'], 'WARNING': COLORS['yellow'],
  99. 'ERROR': COLORS['red'], 'CRITICAL': COLORS['magenta']}
  100. def __init__(self, fmt=None, use_color=True):
  101. logging.Formatter.__init__(self, fmt)
  102. self.use_color = use_color
  103. def formatException(self, ei):
  104. if ei and not isinstance(ei, tuple):
  105. ei = sys.exc_info()
  106. r = logging.Formatter.formatException(self, ei)
  107. if isinstance(r, str) and not PY3:
  108. return safe_str(r)
  109. return r
  110. def format(self, record):
  111. msg = logging.Formatter.format(self, record)
  112. color = self.colors.get(record.levelname)
  113. # reset exception info later for other handlers...
  114. einfo = sys.exc_info() if record.exc_info == 1 else record.exc_info
  115. if color and self.use_color:
  116. try:
  117. # safe_str will repr the color object
  118. # and color will break on non-string objects
  119. # so need to reorder calls based on type.
  120. # Issue #427
  121. try:
  122. if isinstance(msg, string_t):
  123. return text_t(color(safe_str(msg)))
  124. return safe_str(color(msg))
  125. except UnicodeDecodeError:
  126. return safe_str(msg) # skip colors
  127. except Exception as exc:
  128. prev_msg, record.exc_info, record.msg = (
  129. record.msg, 1, '<Unrepresentable {0!r}: {1!r}>'.format(
  130. type(msg), exc
  131. ),
  132. )
  133. try:
  134. return logging.Formatter.format(self, record)
  135. finally:
  136. record.msg, record.exc_info = prev_msg, einfo
  137. else:
  138. return safe_str(msg)
  139. class LoggingProxy(object):
  140. """Forward file object to :class:`logging.Logger` instance.
  141. :param logger: The :class:`logging.Logger` instance to forward to.
  142. :param loglevel: Loglevel to use when writing messages.
  143. """
  144. mode = 'w'
  145. name = None
  146. closed = False
  147. loglevel = logging.ERROR
  148. _thread = threading.local()
  149. def __init__(self, logger, loglevel=None):
  150. self.logger = logger
  151. self.loglevel = mlevel(loglevel or self.logger.level or self.loglevel)
  152. self._safewrap_handlers()
  153. def _safewrap_handlers(self):
  154. """Make the logger handlers dump internal errors to
  155. `sys.__stderr__` instead of `sys.stderr` to circumvent
  156. infinite loops."""
  157. def wrap_handler(handler): # pragma: no cover
  158. class WithSafeHandleError(logging.Handler):
  159. def handleError(self, record):
  160. exc_info = sys.exc_info()
  161. try:
  162. try:
  163. traceback.print_exception(exc_info[0],
  164. exc_info[1],
  165. exc_info[2],
  166. None, sys.__stderr__)
  167. except IOError:
  168. pass # see python issue 5971
  169. finally:
  170. del(exc_info)
  171. handler.handleError = WithSafeHandleError().handleError
  172. return [wrap_handler(h) for h in self.logger.handlers]
  173. def write(self, data):
  174. """Write message to logging object."""
  175. if _in_sighandler:
  176. return print(safe_str(data), file=sys.__stderr__)
  177. if getattr(self._thread, 'recurse_protection', False):
  178. # Logger is logging back to this file, so stop recursing.
  179. return
  180. data = data.strip()
  181. if data and not self.closed:
  182. self._thread.recurse_protection = True
  183. try:
  184. self.logger.log(self.loglevel, safe_str(data))
  185. finally:
  186. self._thread.recurse_protection = False
  187. def writelines(self, sequence):
  188. """`writelines(sequence_of_strings) -> None`.
  189. Write the strings to the file.
  190. The sequence can be any iterable object producing strings.
  191. This is equivalent to calling :meth:`write` for each string.
  192. """
  193. for part in sequence:
  194. self.write(part)
  195. def flush(self):
  196. """This object is not buffered so any :meth:`flush` requests
  197. are ignored."""
  198. pass
  199. def close(self):
  200. """When the object is closed, no write requests are forwarded to
  201. the logging object anymore."""
  202. self.closed = True
  203. def isatty(self):
  204. """Always return :const:`False`. Just here for file support."""
  205. return False
  206. def ensure_process_aware_logger(force=False):
  207. """Make sure process name is recorded when loggers are used."""
  208. global _process_aware
  209. if force or not _process_aware:
  210. logging._acquireLock()
  211. try:
  212. _process_aware = True
  213. Logger = logging.getLoggerClass()
  214. if getattr(Logger, '_process_aware', False): # pragma: no cover
  215. return
  216. class ProcessAwareLogger(Logger):
  217. _signal_safe = True
  218. _process_aware = True
  219. def makeRecord(self, *args, **kwds):
  220. record = Logger.makeRecord(self, *args, **kwds)
  221. record.processName = current_process()._name
  222. return record
  223. def log(self, *args, **kwargs):
  224. if _in_sighandler:
  225. return
  226. return Logger.log(self, *args, **kwargs)
  227. logging.setLoggerClass(ProcessAwareLogger)
  228. finally:
  229. logging._releaseLock()
  230. def get_multiprocessing_logger():
  231. return mputil.get_logger() if mputil else None
  232. def reset_multiprocessing_logger():
  233. if mputil and hasattr(mputil, '_logger'):
  234. mputil._logger = None
  235. def current_process_index(base=1):
  236. if current_process:
  237. index = getattr(current_process(), 'index', None)
  238. return index + base if index is not None else index
  239. ensure_process_aware_logger()