log.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import time
  5. import os
  6. import sys
  7. import traceback
  8. from celery import conf
  9. from celery.utils import noop
  10. from celery.utils.compat import LoggerAdapter
  11. from celery.utils.patch import ensure_process_aware_logger
  12. _hijacked = False
  13. _monkeypatched = False
  14. BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
  15. RESET_SEQ = "\033[0m"
  16. COLOR_SEQ = "\033[1;%dm"
  17. BOLD_SEQ = "\033[1m"
  18. COLORS = {"DEBUG": BLUE,
  19. "WARNING": YELLOW,
  20. "ERROR": RED,
  21. "CRITICAL": MAGENTA}
  22. class ColorFormatter(logging.Formatter):
  23. def __init__(self, msg, use_color=True):
  24. logging.Formatter.__init__(self, msg)
  25. self.use_color = use_color
  26. def format(self, record):
  27. levelname = record.levelname
  28. if self.use_color and levelname in COLORS:
  29. record.msg = COLOR_SEQ % (
  30. 30 + COLORS[levelname]) + record.msg + RESET_SEQ
  31. return logging.Formatter.format(self, record)
  32. def get_task_logger(loglevel=None, name=None):
  33. ensure_process_aware_logger()
  34. logger = logging.getLogger(name or "celery.task.default")
  35. if loglevel is not None:
  36. logger.setLevel(loglevel)
  37. return logger
  38. def _hijack_multiprocessing_logger():
  39. from multiprocessing import util as mputil
  40. global _hijacked
  41. if _hijacked:
  42. return mputil.get_logger()
  43. ensure_process_aware_logger()
  44. logging.Logger.manager.loggerDict.clear()
  45. try:
  46. if mputil._logger is not None:
  47. mputil.logger = None
  48. except AttributeError:
  49. pass
  50. _hijacked = True
  51. return mputil.get_logger()
  52. def _detect_handler(logfile=None):
  53. """Create log handler with either a filename, an open stream
  54. or ``None`` (stderr)."""
  55. if not logfile or hasattr(logfile, "write"):
  56. return logging.StreamHandler(logfile)
  57. return logging.FileHandler(logfile)
  58. def get_default_logger(loglevel=None):
  59. """Get default logger instance.
  60. :keyword loglevel: Initial log level.
  61. """
  62. logger = _hijack_multiprocessing_logger()
  63. if loglevel is not None:
  64. logger.setLevel(loglevel)
  65. return logger
  66. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  67. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  68. **kwargs):
  69. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  70. then ``stderr`` is used.
  71. Returns logger object.
  72. """
  73. return _setup_logger(get_default_logger(loglevel),
  74. logfile, format, colorize, **kwargs)
  75. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  76. format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  77. task_kwargs=None, **kwargs):
  78. """Setup the task logger. If ``logfile`` is not specified, then
  79. ``stderr`` is used.
  80. Returns logger object.
  81. """
  82. if task_kwargs is None:
  83. task_kwargs = {}
  84. task_kwargs.setdefault("task_id", "-?-")
  85. task_name = task_kwargs.get("task_name")
  86. task_kwargs.setdefault("task_name", "-?-")
  87. logger = _setup_logger(get_task_logger(loglevel, task_name),
  88. logfile, format, colorize, **kwargs)
  89. return LoggerAdapter(logger, task_kwargs)
  90. def _setup_logger(logger, logfile, format, colorize,
  91. formatter=ColorFormatter, **kwargs):
  92. if logger.handlers: # Logger already configured
  93. return logger
  94. handler = _detect_handler(logfile)
  95. handler.setFormatter(formatter(format, use_color=colorize))
  96. logger.addHandler(handler)
  97. return logger
  98. def emergency_error(logfile, message):
  99. """Emergency error logging, for when there's no standard file
  100. descriptors open because the process has been daemonized or for
  101. some other reason."""
  102. closefh = noop
  103. logfile = logfile or sys.__stderr__
  104. if hasattr(logfile, "write"):
  105. logfh = logfile
  106. else:
  107. logfh = open(logfile, "a")
  108. closefh = logfh.close
  109. try:
  110. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  111. "asctime": time.asctime(),
  112. "pid": os.getpid(),
  113. "message": message})
  114. finally:
  115. closefh()
  116. def redirect_stdouts_to_logger(logger, loglevel=None):
  117. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  118. logging instance.
  119. :param logger: The :class:`logging.Logger` instance to redirect to.
  120. :param loglevel: The loglevel redirected messages will be logged as.
  121. """
  122. proxy = LoggingProxy(logger, loglevel)
  123. sys.stdout = sys.stderr = proxy
  124. return proxy
  125. class LoggingProxy(object):
  126. """Forward file object to :class:`logging.Logger` instance.
  127. :param logger: The :class:`logging.Logger` instance to forward to.
  128. :param loglevel: Loglevel to use when writing messages.
  129. """
  130. mode = "w"
  131. name = None
  132. closed = False
  133. loglevel = logging.ERROR
  134. _thread = threading.local()
  135. def __init__(self, logger, loglevel=None):
  136. self.logger = logger
  137. self.loglevel = loglevel or self.logger.level or self.loglevel
  138. self._safewrap_handlers()
  139. def _safewrap_handlers(self):
  140. """Make the logger handlers dump internal errors to
  141. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  142. infinite loops."""
  143. def wrap_handler(handler): # pragma: no cover
  144. class WithSafeHandleError(logging.Handler):
  145. def handleError(self, record):
  146. exc_info = sys.exc_info()
  147. try:
  148. try:
  149. traceback.print_exception(exc_info[0],
  150. exc_info[1],
  151. exc_info[2],
  152. None, sys.__stderr__)
  153. except IOError:
  154. pass # see python issue 5971
  155. finally:
  156. del(exc_info)
  157. handler.handleError = WithSafeHandleError().handleError
  158. return map(wrap_handler, self.logger.handlers)
  159. def write(self, data):
  160. if getattr(self._thread, "recurse_protection", False):
  161. # Logger is logging back to this file, so stop recursing.
  162. return
  163. """Write message to logging object."""
  164. data = data.strip()
  165. if data and not self.closed:
  166. self._thread.recurse_protection = True
  167. try:
  168. self.logger.log(self.loglevel, data)
  169. finally:
  170. self._thread.recurse_protection = False
  171. def writelines(self, sequence):
  172. """``writelines(sequence_of_strings) -> None``.
  173. Write the strings to the file.
  174. The sequence can be any iterable object producing strings.
  175. This is equivalent to calling :meth:`write` for each string.
  176. """
  177. map(self.write, sequence)
  178. def flush(self):
  179. """This object is not buffered so any :meth:`flush` requests
  180. are ignored."""
  181. pass
  182. def close(self):
  183. """When the object is closed, no write requests are forwarded to
  184. the logging object anymore."""
  185. self.closed = True
  186. def isatty(self):
  187. """Always returns ``False``. Just here for file support."""
  188. return False
  189. def fileno(self):
  190. return None
  191. class SilenceRepeated(object):
  192. """Only log action every n iterations."""
  193. def __init__(self, action, max_iterations=10):
  194. self.action = action
  195. self.max_iterations = max_iterations
  196. self._iterations = 0
  197. def __call__(self, *msgs):
  198. if self._iterations >= self.max_iterations:
  199. map(self.action, msgs)
  200. self._iterations = 0
  201. else:
  202. self._iterations += 1