log.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. """celery.log"""
  2. import os
  3. import sys
  4. import time
  5. import logging
  6. import traceback
  7. from celery import conf
  8. from celery.utils import noop
  9. from celery.utils.patch import ensure_process_aware_logger
  10. from celery.utils.compat import LoggerAdapter
  11. _hijacked = False
  12. _monkeypatched = False
  13. BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
  14. RESET_SEQ = "\033[0m"
  15. COLOUR_SEQ = "\033[1;%dm"
  16. BOLD_SEQ = "\033[1m"
  17. COLOURS = {
  18. 'WARNING': YELLOW,
  19. 'DEBUG': BLUE,
  20. 'CRITICAL': MAGENTA,
  21. 'ERROR': RED
  22. }
  23. class ColourFormatter(logging.Formatter):
  24. def __init__(self, msg, use_colour=True):
  25. logging.Formatter.__init__(self, msg)
  26. self.use_colour = use_colour
  27. def format(self, record):
  28. levelname = record.levelname
  29. if self.use_colour and levelname in COLOURS:
  30. record.msg = COLOUR_SEQ % (
  31. 30 + COLOURS[levelname]) + record.msg + RESET_SEQ
  32. return logging.Formatter.format(self, record)
  33. def get_task_logger(loglevel=None):
  34. ensure_process_aware_logger()
  35. logger = logging.getLogger("celery.Task")
  36. if loglevel is not None:
  37. logger.setLevel(loglevel)
  38. return logger
  39. def _hijack_multiprocessing_logger():
  40. from multiprocessing import util as mputil
  41. global _hijacked
  42. if _hijacked:
  43. return mputil.get_logger()
  44. ensure_process_aware_logger()
  45. logging.Logger.manager.loggerDict.clear()
  46. try:
  47. if mputil._logger is not None:
  48. mputil.logger = None
  49. except AttributeError:
  50. pass
  51. _hijacked = True
  52. return mputil.get_logger()
  53. def _detect_handler(logfile=None):
  54. """Create log handler with either a filename, an open stream
  55. or ``None`` (stderr)."""
  56. if not logfile or hasattr(logfile, "write"):
  57. return logging.StreamHandler(logfile)
  58. return logging.FileHandler(logfile)
  59. def get_default_logger(loglevel=None):
  60. """Get default logger instance.
  61. :keyword loglevel: Initial log level.
  62. """
  63. logger = _hijack_multiprocessing_logger()
  64. if loglevel is not None:
  65. logger.setLevel(loglevel)
  66. return logger
  67. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  68. format=conf.CELERYD_LOG_FORMAT, colourize=conf.CELERYD_LOG_COLOR,
  69. **kwargs):
  70. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  71. then ``stderr`` is used.
  72. Returns logger object.
  73. """
  74. return _setup_logger(get_default_logger(loglevel),
  75. logfile, format, colourize, **kwargs)
  76. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  77. format=conf.CELERYD_TASK_LOG_FORMAT, colourize=conf.CELERYD_LOG_COLOR,
  78. task_kwargs=None, **kwargs):
  79. """Setup the task logger. If ``logfile`` is not specified, then
  80. ``stderr`` is used.
  81. Returns logger object.
  82. """
  83. if task_kwargs is None:
  84. task_kwargs = {}
  85. task_kwargs.setdefault("task_id", "-?-")
  86. task_kwargs.setdefault("task_name", "-?-")
  87. logger = _setup_logger(get_task_logger(loglevel),
  88. logfile, format, colourize, **kwargs)
  89. return LoggerAdapter(logger, task_kwargs)
  90. def _setup_logger(logger, logfile, format, colourize,
  91. formatter=ColourFormatter, **kwargs):
  92. if logger.handlers: # Logger already configured
  93. return logger
  94. handler = _detect_handler(logfile)
  95. handler.setFormatter(formatter(format, use_colour=colourize))
  96. logger.addHandler(handler)
  97. return logger
  98. def emergency_error(logfile, message):
  99. """Emergency error logging, for when there's no standard file
  100. descriptors open because the process has been daemonized or for
  101. some other reason."""
  102. closefh = noop
  103. logfile = logfile or sys.__stderr__
  104. if hasattr(logfile, "write"):
  105. logfh = logfile
  106. else:
  107. logfh = open(logfile, "a")
  108. closefh = logfh.close
  109. try:
  110. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  111. "asctime": time.asctime(),
  112. "pid": os.getpid(),
  113. "message": message})
  114. finally:
  115. closefh()
  116. def redirect_stdouts_to_logger(logger, loglevel=None):
  117. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  118. logging instance.
  119. :param logger: The :class:`logging.Logger` instance to redirect to.
  120. :param loglevel: The loglevel redirected messages will be logged as.
  121. """
  122. proxy = LoggingProxy(logger, loglevel)
  123. sys.stdout = sys.stderr = proxy
  124. return proxy
  125. class LoggingProxy(object):
  126. """Forward file object to :class:`logging.Logger` instance.
  127. :param logger: The :class:`logging.Logger` instance to forward to.
  128. :param loglevel: Loglevel to use when writing messages.
  129. """
  130. mode = "w"
  131. name = None
  132. closed = False
  133. loglevel = logging.ERROR
  134. def __init__(self, logger, loglevel=None):
  135. self.logger = logger
  136. self.loglevel = loglevel or self.logger.level or self.loglevel
  137. self._safewrap_handlers()
  138. def _safewrap_handlers(self):
  139. """Make the logger handlers dump internal errors to
  140. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  141. infinite loops."""
  142. def wrap_handler(handler): # pragma: no cover
  143. class WithSafeHandleError(logging.Handler):
  144. def handleError(self, record):
  145. exc_info = sys.exc_info()
  146. try:
  147. try:
  148. traceback.print_exception(exc_info[0],
  149. exc_info[1],
  150. exc_info[2],
  151. None, sys.__stderr__)
  152. except IOError:
  153. pass # see python issue 5971
  154. finally:
  155. del(exc_info)
  156. handler.handleError = WithSafeHandleError().handleError
  157. return map(wrap_handler, self.logger.handlers)
  158. def write(self, data):
  159. """Write message to logging object."""
  160. if not self.closed:
  161. self.logger.log(self.loglevel, data)
  162. def writelines(self, sequence):
  163. """``writelines(sequence_of_strings) -> None``.
  164. Write the strings to the file.
  165. The sequence can be any iterable object producing strings.
  166. This is equivalent to calling :meth:`write` for each string.
  167. """
  168. map(self.write, sequence)
  169. def flush(self):
  170. """This object is not buffered so any :meth:`flush` requests
  171. are ignored."""
  172. pass
  173. def close(self):
  174. """When the object is closed, no write requests are forwarded to
  175. the logging object anymore."""
  176. self.closed = True
  177. def isatty(self):
  178. """Always returns ``False``. Just here for file support."""
  179. return False
  180. def fileno(self):
  181. return None
  182. class SilenceRepeated(object):
  183. """Only log action every n iterations."""
  184. def __init__(self, action, max_iterations=10):
  185. self.action = action
  186. self.max_iterations = max_iterations
  187. self._iterations = 0
  188. def __call__(self, *msgs):
  189. if self._iterations >= self.max_iterations:
  190. map(self.action, msgs)
  191. self._iterations = 0
  192. else:
  193. self._iterations += 1