log.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import time
  5. import os
  6. import sys
  7. import traceback
  8. from celery import conf
  9. from celery import signals
  10. from celery.utils import noop
  11. from celery.utils.compat import LoggerAdapter
  12. from celery.utils.patch import ensure_process_aware_logger
  13. # The logging subsystem is only configured once per process.
  14. # setup_logging_subsystem sets this flag, and subsequent calls
  15. # will do nothing.
  16. _setup = False
  17. BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
  18. RESET_SEQ = "\033[0m"
  19. COLOR_SEQ = "\033[1;%dm"
  20. BOLD_SEQ = "\033[1m"
  21. COLORS = {"DEBUG": BLUE,
  22. "WARNING": YELLOW,
  23. "ERROR": RED,
  24. "CRITICAL": MAGENTA}
  25. class ColorFormatter(logging.Formatter):
  26. def __init__(self, msg, use_color=True):
  27. logging.Formatter.__init__(self, msg)
  28. self.use_color = use_color
  29. def format(self, record):
  30. levelname = record.levelname
  31. if self.use_color and levelname in COLORS:
  32. record.msg = COLOR_SEQ % (
  33. 30 + COLORS[levelname]) + record.msg + RESET_SEQ
  34. return logging.Formatter.format(self, record)
  35. def get_task_logger(loglevel=None, name=None):
  36. logger = logging.getLogger(name or "celery.task.default")
  37. if loglevel is not None:
  38. logger.setLevel(loglevel)
  39. return logger
  40. def setup_logging_subsystem(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  41. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  42. **kwargs):
  43. global _setup
  44. if not _setup:
  45. print("SETTING LOGGER TO %s" % (logfile, ))
  46. ensure_process_aware_logger()
  47. logging.Logger.manager.loggerDict.clear()
  48. from multiprocessing import util as mputil
  49. try:
  50. if mputil._logger is not None:
  51. mputil.logger = None
  52. except AttributeError:
  53. pass
  54. receivers = signals.setup_logging.send(sender=None,
  55. loglevel=loglevel,
  56. logfile=logfile,
  57. format=format,
  58. colorize=colorize)
  59. if not receivers:
  60. root = logging.getLogger()
  61. _setup_logger(root, logfile, loglevel, format, colorize, **kwargs)
  62. root.setLevel(loglevel)
  63. _setup = True
  64. return receivers
  65. def _detect_handler(logfile=None):
  66. """Create log handler with either a filename, an open stream
  67. or ``None`` (stderr)."""
  68. if not logfile or hasattr(logfile, "write"):
  69. return logging.StreamHandler(logfile)
  70. return logging.FileHandler(logfile)
  71. def get_default_logger(loglevel=None, name="celery"):
  72. """Get default logger instance.
  73. :keyword loglevel: Initial log level.
  74. """
  75. logger = logging.getLogger(name)
  76. if loglevel is not None:
  77. logger.setLevel(loglevel)
  78. return logger
  79. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  80. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  81. name="celery", root=True, **kwargs):
  82. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  83. then ``stderr`` is used.
  84. Returns logger object.
  85. """
  86. if not root:
  87. return _setup_logger(get_default_logger(loglevel, name),
  88. logfile, format, colorize, **kwargs)
  89. setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
  90. return get_default_logger(name=name)
  91. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  92. format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  93. task_kwargs=None, root=True, **kwargs):
  94. """Setup the task logger. If ``logfile`` is not specified, then
  95. ``stderr`` is used.
  96. Returns logger object.
  97. """
  98. if task_kwargs is None:
  99. task_kwargs = {}
  100. task_kwargs.setdefault("task_id", "-?-")
  101. task_name = task_kwargs.get("task_name")
  102. task_kwargs.setdefault("task_name", "-?-")
  103. if not root:
  104. logger = _setup_logger(get_task_logger(loglevel, task_name),
  105. logfile, format, colorize, **kwargs)
  106. else:
  107. setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
  108. logger = get_task_logger(name=task_name)
  109. return LoggerAdapter(logger, task_kwargs)
  110. def _setup_logger(logger, logfile, format, colorize,
  111. formatter=ColorFormatter, **kwargs):
  112. if logger.handlers: # Logger already configured
  113. return logger
  114. handler = _detect_handler(logfile)
  115. handler.setFormatter(formatter(format, use_color=colorize))
  116. logger.addHandler(handler)
  117. return logger
  118. def emergency_error(logfile, message):
  119. """Emergency error logging, for when there's no standard file
  120. descriptors open because the process has been daemonized or for
  121. some other reason."""
  122. closefh = noop
  123. logfile = logfile or sys.__stderr__
  124. if hasattr(logfile, "write"):
  125. logfh = logfile
  126. else:
  127. logfh = open(logfile, "a")
  128. closefh = logfh.close
  129. try:
  130. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  131. "asctime": time.asctime(),
  132. "pid": os.getpid(),
  133. "message": message})
  134. finally:
  135. closefh()
  136. def redirect_stdouts_to_logger(logger, loglevel=None):
  137. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  138. logging instance.
  139. :param logger: The :class:`logging.Logger` instance to redirect to.
  140. :param loglevel: The loglevel redirected messages will be logged as.
  141. """
  142. proxy = LoggingProxy(logger, loglevel)
  143. sys.stdout = sys.stderr = proxy
  144. return proxy
  145. class LoggingProxy(object):
  146. """Forward file object to :class:`logging.Logger` instance.
  147. :param logger: The :class:`logging.Logger` instance to forward to.
  148. :param loglevel: Loglevel to use when writing messages.
  149. """
  150. mode = "w"
  151. name = None
  152. closed = False
  153. loglevel = logging.ERROR
  154. _thread = threading.local()
  155. def __init__(self, logger, loglevel=None):
  156. self.logger = logger
  157. self.loglevel = loglevel or self.logger.level or self.loglevel
  158. self._safewrap_handlers()
  159. def _safewrap_handlers(self):
  160. """Make the logger handlers dump internal errors to
  161. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  162. infinite loops."""
  163. def wrap_handler(handler): # pragma: no cover
  164. class WithSafeHandleError(logging.Handler):
  165. def handleError(self, record):
  166. exc_info = sys.exc_info()
  167. try:
  168. try:
  169. traceback.print_exception(exc_info[0],
  170. exc_info[1],
  171. exc_info[2],
  172. None, sys.__stderr__)
  173. except IOError:
  174. pass # see python issue 5971
  175. finally:
  176. del(exc_info)
  177. handler.handleError = WithSafeHandleError().handleError
  178. return map(wrap_handler, self.logger.handlers)
  179. def write(self, data):
  180. if getattr(self._thread, "recurse_protection", False):
  181. # Logger is logging back to this file, so stop recursing.
  182. return
  183. """Write message to logging object."""
  184. data = data.strip()
  185. if data and not self.closed:
  186. self._thread.recurse_protection = True
  187. try:
  188. self.logger.log(self.loglevel, data)
  189. finally:
  190. self._thread.recurse_protection = False
  191. def writelines(self, sequence):
  192. """``writelines(sequence_of_strings) -> None``.
  193. Write the strings to the file.
  194. The sequence can be any iterable object producing strings.
  195. This is equivalent to calling :meth:`write` for each string.
  196. """
  197. map(self.write, sequence)
  198. def flush(self):
  199. """This object is not buffered so any :meth:`flush` requests
  200. are ignored."""
  201. pass
  202. def close(self):
  203. """When the object is closed, no write requests are forwarded to
  204. the logging object anymore."""
  205. self.closed = True
  206. def isatty(self):
  207. """Always returns ``False``. Just here for file support."""
  208. return False
  209. def fileno(self):
  210. return None
  211. class SilenceRepeated(object):
  212. """Only log action every n iterations."""
  213. def __init__(self, action, max_iterations=10):
  214. self.action = action
  215. self.max_iterations = max_iterations
  216. self._iterations = 0
  217. def __call__(self, *msgs):
  218. if self._iterations >= self.max_iterations:
  219. map(self.action, msgs)
  220. self._iterations = 0
  221. else:
  222. self._iterations += 1