log.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import time
  5. import os
  6. import sys
  7. import traceback
  8. from multiprocessing import current_process
  9. from celery import conf
  10. from celery import signals
  11. from celery.utils import noop
  12. from celery.utils.compat import LoggerAdapter
  13. from celery.utils.patch import ensure_process_aware_logger
  14. # The logging subsystem is only configured once per process.
  15. # setup_logging_subsystem sets this flag, and subsequent calls
  16. # will do nothing.
  17. _setup = False
  18. BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
  19. RESET_SEQ = "\033[0m"
  20. COLOR_SEQ = "\033[1;%dm"
  21. BOLD_SEQ = "\033[1m"
  22. COLORS = {"DEBUG": BLUE,
  23. "WARNING": YELLOW,
  24. "ERROR": RED,
  25. "CRITICAL": MAGENTA}
  26. class ColorFormatter(logging.Formatter):
  27. def __init__(self, msg, use_color=True):
  28. logging.Formatter.__init__(self, msg)
  29. self.use_color = use_color
  30. def format(self, record):
  31. levelname = record.levelname
  32. if self.use_color and levelname in COLORS:
  33. record.msg = COLOR_SEQ % (
  34. 30 + COLORS[levelname]) + record.msg + RESET_SEQ
  35. # Very ugly, but have to make sure processName is supported
  36. # by foreign logger instances.
  37. # (processName is always supported by Python 2.7)
  38. if "processName" not in record.__dict__:
  39. record.__dict__["processName"] = current_process()._name
  40. return logging.Formatter.format(self, record)
  41. def get_task_logger(loglevel=None, name=None):
  42. logger = logging.getLogger(name or "celery.task.default")
  43. if loglevel is not None:
  44. logger.setLevel(loglevel)
  45. return logger
  46. def setup_logging_subsystem(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  47. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  48. **kwargs):
  49. global _setup
  50. if not _setup:
  51. ensure_process_aware_logger()
  52. logging.Logger.manager.loggerDict.clear()
  53. from multiprocessing import util as mputil
  54. try:
  55. if mputil._logger is not None:
  56. mputil.logger = None
  57. except AttributeError:
  58. pass
  59. receivers = signals.setup_logging.send(sender=None,
  60. loglevel=loglevel,
  61. logfile=logfile,
  62. format=format,
  63. colorize=colorize)
  64. if not receivers:
  65. root = logging.getLogger()
  66. _setup_logger(root, logfile, format, colorize, **kwargs)
  67. root.setLevel(loglevel)
  68. _setup = True
  69. return receivers
  70. def _detect_handler(logfile=None):
  71. """Create log handler with either a filename, an open stream
  72. or ``None`` (stderr)."""
  73. if not logfile or hasattr(logfile, "write"):
  74. return logging.StreamHandler(logfile)
  75. return logging.FileHandler(logfile)
  76. def get_default_logger(loglevel=None, name="celery"):
  77. """Get default logger instance.
  78. :keyword loglevel: Initial log level.
  79. """
  80. logger = logging.getLogger(name)
  81. if loglevel is not None:
  82. logger.setLevel(loglevel)
  83. return logger
  84. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  85. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  86. name="celery", root=True, **kwargs):
  87. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  88. then ``stderr`` is used.
  89. Returns logger object.
  90. """
  91. if not root:
  92. return _setup_logger(get_default_logger(loglevel, name),
  93. logfile, format, colorize, **kwargs)
  94. setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
  95. return get_default_logger(name=name)
  96. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  97. format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  98. task_kwargs=None, root=True, **kwargs):
  99. """Setup the task logger. If ``logfile`` is not specified, then
  100. ``stderr`` is used.
  101. Returns logger object.
  102. """
  103. if task_kwargs is None:
  104. task_kwargs = {}
  105. task_kwargs.setdefault("task_id", "-?-")
  106. task_name = task_kwargs.get("task_name")
  107. task_kwargs.setdefault("task_name", "-?-")
  108. if not root:
  109. logger = _setup_logger(get_task_logger(loglevel, task_name),
  110. logfile, format, colorize, **kwargs)
  111. else:
  112. setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
  113. logger = get_task_logger(name=task_name)
  114. return LoggerAdapter(logger, task_kwargs)
  115. def _setup_logger(logger, logfile, format, colorize,
  116. formatter=ColorFormatter, **kwargs):
  117. if logger.handlers: # Logger already configured
  118. return logger
  119. handler = _detect_handler(logfile)
  120. handler.setFormatter(formatter(format, use_color=colorize))
  121. logger.addHandler(handler)
  122. return logger
  123. def emergency_error(logfile, message):
  124. """Emergency error logging, for when there's no standard file
  125. descriptors open because the process has been daemonized or for
  126. some other reason."""
  127. closefh = noop
  128. logfile = logfile or sys.__stderr__
  129. if hasattr(logfile, "write"):
  130. logfh = logfile
  131. else:
  132. logfh = open(logfile, "a")
  133. closefh = logfh.close
  134. try:
  135. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  136. "asctime": time.asctime(),
  137. "pid": os.getpid(),
  138. "message": message})
  139. finally:
  140. closefh()
  141. def redirect_stdouts_to_logger(logger, loglevel=None):
  142. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  143. logging instance.
  144. :param logger: The :class:`logging.Logger` instance to redirect to.
  145. :param loglevel: The loglevel redirected messages will be logged as.
  146. """
  147. proxy = LoggingProxy(logger, loglevel)
  148. sys.stdout = sys.stderr = proxy
  149. return proxy
  150. class LoggingProxy(object):
  151. """Forward file object to :class:`logging.Logger` instance.
  152. :param logger: The :class:`logging.Logger` instance to forward to.
  153. :param loglevel: Loglevel to use when writing messages.
  154. """
  155. mode = "w"
  156. name = None
  157. closed = False
  158. loglevel = logging.ERROR
  159. _thread = threading.local()
  160. def __init__(self, logger, loglevel=None):
  161. self.logger = logger
  162. self.loglevel = loglevel or self.logger.level or self.loglevel
  163. self._safewrap_handlers()
  164. def _safewrap_handlers(self):
  165. """Make the logger handlers dump internal errors to
  166. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  167. infinite loops."""
  168. def wrap_handler(handler): # pragma: no cover
  169. class WithSafeHandleError(logging.Handler):
  170. def handleError(self, record):
  171. exc_info = sys.exc_info()
  172. try:
  173. try:
  174. traceback.print_exception(exc_info[0],
  175. exc_info[1],
  176. exc_info[2],
  177. None, sys.__stderr__)
  178. except IOError:
  179. pass # see python issue 5971
  180. finally:
  181. del(exc_info)
  182. handler.handleError = WithSafeHandleError().handleError
  183. return map(wrap_handler, self.logger.handlers)
  184. def write(self, data):
  185. if getattr(self._thread, "recurse_protection", False):
  186. # Logger is logging back to this file, so stop recursing.
  187. return
  188. """Write message to logging object."""
  189. data = data.strip()
  190. if data and not self.closed:
  191. self._thread.recurse_protection = True
  192. try:
  193. self.logger.log(self.loglevel, data)
  194. finally:
  195. self._thread.recurse_protection = False
  196. def writelines(self, sequence):
  197. """``writelines(sequence_of_strings) -> None``.
  198. Write the strings to the file.
  199. The sequence can be any iterable object producing strings.
  200. This is equivalent to calling :meth:`write` for each string.
  201. """
  202. map(self.write, sequence)
  203. def flush(self):
  204. """This object is not buffered so any :meth:`flush` requests
  205. are ignored."""
  206. pass
  207. def close(self):
  208. """When the object is closed, no write requests are forwarded to
  209. the logging object anymore."""
  210. self.closed = True
  211. def isatty(self):
  212. """Always returns ``False``. Just here for file support."""
  213. return False
  214. def fileno(self):
  215. return None
  216. class SilenceRepeated(object):
  217. """Only log action every n iterations."""
  218. def __init__(self, action, max_iterations=10):
  219. self.action = action
  220. self.max_iterations = max_iterations
  221. self._iterations = 0
  222. def __call__(self, *msgs):
  223. if self._iterations >= self.max_iterations:
  224. map(self.action, msgs)
  225. self._iterations = 0
  226. else:
  227. self._iterations += 1