log.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. """celery.log"""
  2. import os
  3. import sys
  4. import time
  5. import logging
  6. import traceback
  7. from celery import conf
  8. from celery.utils import noop
  9. from celery.utils.patch import ensure_process_aware_logger
  10. from celery.utils.compat import LoggerAdapter
  11. _hijacked = False
  12. _monkeypatched = False
  13. BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
  14. RESET_SEQ = "\033[0m"
  15. COLOR_SEQ = "\033[1;%dm"
  16. BOLD_SEQ = "\033[1m"
  17. COLORS = {
  18. "WARNING": YELLOW,
  19. "DEBUG": BLUE,
  20. "CRITICAL": MAGENTA,
  21. "ERROR": RED,
  22. }
  23. class ColorFormatter(logging.Formatter):
  24. def __init__(self, msg, use_color=True):
  25. logging.Formatter.__init__(self, msg)
  26. self.use_color = use_color
  27. def format(self, record):
  28. levelname = record.levelname
  29. if self.use_color and levelname in COLORS:
  30. record.msg = COLOR_SEQ % (
  31. 30 + COLORS[levelname]) + record.msg + RESET_SEQ
  32. return logging.Formatter.format(self, record)
  33. def get_task_logger(loglevel=None, name=None):
  34. ensure_process_aware_logger()
  35. logger = logging.getLogger(name or "celery.task.default")
  36. if loglevel is not None:
  37. logger.setLevel(loglevel)
  38. return logger
  39. def _hijack_multiprocessing_logger():
  40. from multiprocessing import util as mputil
  41. global _hijacked
  42. if _hijacked:
  43. return mputil.get_logger()
  44. ensure_process_aware_logger()
  45. logging.Logger.manager.loggerDict.clear()
  46. try:
  47. if mputil._logger is not None:
  48. mputil.logger = None
  49. except AttributeError:
  50. pass
  51. _hijacked = True
  52. return mputil.get_logger()
  53. def _detect_handler(logfile=None):
  54. """Create log handler with either a filename, an open stream
  55. or ``None`` (stderr)."""
  56. if not logfile or hasattr(logfile, "write"):
  57. return logging.StreamHandler(logfile)
  58. return logging.FileHandler(logfile)
  59. def get_default_logger(loglevel=None):
  60. """Get default logger instance.
  61. :keyword loglevel: Initial log level.
  62. """
  63. logger = _hijack_multiprocessing_logger()
  64. if loglevel is not None:
  65. logger.setLevel(loglevel)
  66. return logger
  67. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  68. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  69. **kwargs):
  70. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  71. then ``stderr`` is used.
  72. Returns logger object.
  73. """
  74. return _setup_logger(get_default_logger(loglevel),
  75. logfile, format, colorize, **kwargs)
  76. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  77. format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  78. task_kwargs=None, **kwargs):
  79. """Setup the task logger. If ``logfile`` is not specified, then
  80. ``stderr`` is used.
  81. Returns logger object.
  82. """
  83. if task_kwargs is None:
  84. task_kwargs = {}
  85. task_kwargs.setdefault("task_id", "-?-")
  86. task_name = task_kwargs.get("task_name")
  87. task_kwargs.setdefault("task_name", "-?-")
  88. logger = _setup_logger(get_task_logger(loglevel, task_name),
  89. logfile, format, colorize, **kwargs)
  90. return LoggerAdapter(logger, task_kwargs)
  91. def _setup_logger(logger, logfile, format, colorize,
  92. formatter=ColorFormatter, **kwargs):
  93. if logger.handlers: # Logger already configured
  94. return logger
  95. handler = _detect_handler(logfile)
  96. handler.setFormatter(formatter(format, use_color=colorize))
  97. logger.addHandler(handler)
  98. return logger
  99. def emergency_error(logfile, message):
  100. """Emergency error logging, for when there's no standard file
  101. descriptors open because the process has been daemonized or for
  102. some other reason."""
  103. closefh = noop
  104. logfile = logfile or sys.__stderr__
  105. if hasattr(logfile, "write"):
  106. logfh = logfile
  107. else:
  108. logfh = open(logfile, "a")
  109. closefh = logfh.close
  110. try:
  111. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  112. "asctime": time.asctime(),
  113. "pid": os.getpid(),
  114. "message": message})
  115. finally:
  116. closefh()
  117. def redirect_stdouts_to_logger(logger, loglevel=None):
  118. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  119. logging instance.
  120. :param logger: The :class:`logging.Logger` instance to redirect to.
  121. :param loglevel: The loglevel redirected messages will be logged as.
  122. """
  123. proxy = LoggingProxy(logger, loglevel)
  124. sys.stdout = sys.stderr = proxy
  125. return proxy
  126. class LoggingProxy(object):
  127. """Forward file object to :class:`logging.Logger` instance.
  128. :param logger: The :class:`logging.Logger` instance to forward to.
  129. :param loglevel: Loglevel to use when writing messages.
  130. """
  131. mode = "w"
  132. name = None
  133. closed = False
  134. loglevel = logging.ERROR
  135. def __init__(self, logger, loglevel=None):
  136. self.logger = logger
  137. self.loglevel = loglevel or self.logger.level or self.loglevel
  138. self._safewrap_handlers()
  139. def _safewrap_handlers(self):
  140. """Make the logger handlers dump internal errors to
  141. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  142. infinite loops."""
  143. def wrap_handler(handler): # pragma: no cover
  144. class WithSafeHandleError(logging.Handler):
  145. def handleError(self, record):
  146. exc_info = sys.exc_info()
  147. try:
  148. try:
  149. traceback.print_exception(exc_info[0],
  150. exc_info[1],
  151. exc_info[2],
  152. None, sys.__stderr__)
  153. except IOError:
  154. pass # see python issue 5971
  155. finally:
  156. del(exc_info)
  157. handler.handleError = WithSafeHandleError().handleError
  158. return map(wrap_handler, self.logger.handlers)
  159. def write(self, data):
  160. """Write message to logging object."""
  161. data = data.strip()
  162. if data and not self.closed:
  163. self.logger.log(self.loglevel, data)
  164. def writelines(self, sequence):
  165. """``writelines(sequence_of_strings) -> None``.
  166. Write the strings to the file.
  167. The sequence can be any iterable object producing strings.
  168. This is equivalent to calling :meth:`write` for each string.
  169. """
  170. map(self.write, sequence)
  171. def flush(self):
  172. """This object is not buffered so any :meth:`flush` requests
  173. are ignored."""
  174. pass
  175. def close(self):
  176. """When the object is closed, no write requests are forwarded to
  177. the logging object anymore."""
  178. self.closed = True
  179. def isatty(self):
  180. """Always returns ``False``. Just here for file support."""
  181. return False
  182. def fileno(self):
  183. return None
  184. class SilenceRepeated(object):
  185. """Only log action every n iterations."""
  186. def __init__(self, action, max_iterations=10):
  187. self.action = action
  188. self.max_iterations = max_iterations
  189. self._iterations = 0
  190. def __call__(self, *msgs):
  191. if self._iterations >= self.max_iterations:
  192. map(self.action, msgs)
  193. self._iterations = 0
  194. else:
  195. self._iterations += 1