log.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """celery.log"""
  2. import os
  3. import sys
  4. import time
  5. import logging
  6. import traceback
  7. from celery import conf
  8. from celery.utils import noop
  9. from celery.utils.patch import ensure_process_aware_logger
  10. from celery.utils.compat import LoggerAdapter
  11. _hijacked = False
  12. _monkeypatched = False
  13. BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
  14. RESET_SEQ = "\033[0m"
  15. COLOR_SEQ = "\033[1;%dm"
  16. BOLD_SEQ = "\033[1m"
  17. COLORS = {
  18. "WARNING": YELLOW,
  19. "DEBUG": BLUE,
  20. "CRITICAL": MAGENTA,
  21. "ERROR": RED,
  22. }
  23. class ColorFormatter(logging.Formatter):
  24. def __init__(self, msg, use_color=True):
  25. logging.Formatter.__init__(self, msg)
  26. self.use_color = use_color
  27. def format(self, record):
  28. levelname = record.levelname
  29. if self.use_color and levelname in COLORS:
  30. record.msg = COLOR_SEQ % (
  31. 30 + COLORS[levelname]) + record.msg + RESET_SEQ
  32. return logging.Formatter.format(self, record)
  33. def get_task_logger(loglevel=None):
  34. ensure_process_aware_logger()
  35. logger = logging.getLogger("celery.Task")
  36. if loglevel is not None:
  37. logger.setLevel(loglevel)
  38. return logger
  39. def _hijack_multiprocessing_logger():
  40. from multiprocessing import util as mputil
  41. global _hijacked
  42. if _hijacked:
  43. return mputil.get_logger()
  44. ensure_process_aware_logger()
  45. logging.Logger.manager.loggerDict.clear()
  46. try:
  47. if mputil._logger is not None:
  48. mputil.logger = None
  49. except AttributeError:
  50. pass
  51. _hijacked = True
  52. return mputil.get_logger()
  53. def _detect_handler(logfile=None):
  54. """Create log handler with either a filename, an open stream
  55. or ``None`` (stderr)."""
  56. if not logfile or hasattr(logfile, "write"):
  57. return logging.StreamHandler(logfile)
  58. return logging.FileHandler(logfile)
  59. def get_default_logger(loglevel=None):
  60. """Get default logger instance.
  61. :keyword loglevel: Initial log level.
  62. """
  63. logger = _hijack_multiprocessing_logger()
  64. if loglevel is not None:
  65. logger.setLevel(loglevel)
  66. return logger
  67. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  68. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  69. **kwargs):
  70. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  71. then ``stderr`` is used.
  72. Returns logger object.
  73. """
  74. return _setup_logger(get_default_logger(loglevel),
  75. logfile, format, colorize, **kwargs)
  76. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  77. format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  78. task_kwargs=None, **kwargs):
  79. """Setup the task logger. If ``logfile`` is not specified, then
  80. ``stderr`` is used.
  81. Returns logger object.
  82. """
  83. if task_kwargs is None:
  84. task_kwargs = {}
  85. task_kwargs.setdefault("task_id", "-?-")
  86. task_kwargs.setdefault("task_name", "-?-")
  87. logger = _setup_logger(get_task_logger(loglevel),
  88. logfile, format, colorize, **kwargs)
  89. return LoggerAdapter(logger, task_kwargs)
  90. def _setup_logger(logger, logfile, format, colorize,
  91. formatter=ColorFormatter, **kwargs):
  92. if logger.handlers: # Logger already configured
  93. return logger
  94. handler = _detect_handler(logfile)
  95. handler.setFormatter(formatter(format, use_color=colorize))
  96. logger.addHandler(handler)
  97. return logger
  98. def emergency_error(logfile, message):
  99. """Emergency error logging, for when there's no standard file
  100. descriptors open because the process has been daemonized or for
  101. some other reason."""
  102. closefh = noop
  103. logfile = logfile or sys.__stderr__
  104. if hasattr(logfile, "write"):
  105. logfh = logfile
  106. else:
  107. logfh = open(logfile, "a")
  108. closefh = logfh.close
  109. try:
  110. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  111. "asctime": time.asctime(),
  112. "pid": os.getpid(),
  113. "message": message})
  114. finally:
  115. closefh()
  116. def redirect_stdouts_to_logger(logger, loglevel=None):
  117. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  118. logging instance.
  119. :param logger: The :class:`logging.Logger` instance to redirect to.
  120. :param loglevel: The loglevel redirected messages will be logged as.
  121. """
  122. proxy = LoggingProxy(logger, loglevel)
  123. sys.stdout = sys.stderr = proxy
  124. return proxy
  125. class LoggingProxy(object):
  126. """Forward file object to :class:`logging.Logger` instance.
  127. :param logger: The :class:`logging.Logger` instance to forward to.
  128. :param loglevel: Loglevel to use when writing messages.
  129. """
  130. mode = "w"
  131. name = None
  132. closed = False
  133. loglevel = logging.ERROR
  134. def __init__(self, logger, loglevel=None):
  135. self.logger = logger
  136. self.loglevel = loglevel or self.logger.level or self.loglevel
  137. self._safewrap_handlers()
  138. def _safewrap_handlers(self):
  139. """Make the logger handlers dump internal errors to
  140. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  141. infinite loops."""
  142. def wrap_handler(handler): # pragma: no cover
  143. class WithSafeHandleError(logging.Handler):
  144. def handleError(self, record):
  145. exc_info = sys.exc_info()
  146. try:
  147. try:
  148. traceback.print_exception(exc_info[0],
  149. exc_info[1],
  150. exc_info[2],
  151. None, sys.__stderr__)
  152. except IOError:
  153. pass # see python issue 5971
  154. finally:
  155. del(exc_info)
  156. handler.handleError = WithSafeHandleError().handleError
  157. return map(wrap_handler, self.logger.handlers)
  158. def write(self, data):
  159. """Write message to logging object."""
  160. data = data.strip()
  161. if data and not self.closed:
  162. self.logger.log(self.loglevel, data)
  163. def writelines(self, sequence):
  164. """``writelines(sequence_of_strings) -> None``.
  165. Write the strings to the file.
  166. The sequence can be any iterable object producing strings.
  167. This is equivalent to calling :meth:`write` for each string.
  168. """
  169. map(self.write, sequence)
  170. def flush(self):
  171. """This object is not buffered so any :meth:`flush` requests
  172. are ignored."""
  173. pass
  174. def close(self):
  175. """When the object is closed, no write requests are forwarded to
  176. the logging object anymore."""
  177. self.closed = True
  178. def isatty(self):
  179. """Always returns ``False``. Just here for file support."""
  180. return False
  181. def fileno(self):
  182. return None
  183. class SilenceRepeated(object):
  184. """Only log action every n iterations."""
  185. def __init__(self, action, max_iterations=10):
  186. self.action = action
  187. self.max_iterations = max_iterations
  188. self._iterations = 0
  189. def __call__(self, *msgs):
  190. if self._iterations >= self.max_iterations:
  191. map(self.action, msgs)
  192. self._iterations = 0
  193. else:
  194. self._iterations += 1