log.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import time
  5. import os
  6. import sys
  7. import traceback
  8. from multiprocessing import current_process
  9. from multiprocessing import util as mputil
  10. from celery import conf
  11. from celery import signals
  12. from celery.utils import noop
  13. from celery.utils.compat import LoggerAdapter
  14. from celery.utils.patch import ensure_process_aware_logger
  15. from celery.utils.term import colored
  16. # The logging subsystem is only configured once per process.
  17. # setup_logging_subsystem sets this flag, and subsequent calls
  18. # will do nothing.
  19. _setup = False
  20. COLORS = {"DEBUG": "blue",
  21. "WARNING": "yellow",
  22. "ERROR": "red",
  23. "CRITICAL": "magenta"}
  24. class ColorFormatter(logging.Formatter):
  25. def __init__(self, msg, use_color=True):
  26. logging.Formatter.__init__(self, msg)
  27. self.use_color = use_color
  28. def format(self, record):
  29. levelname = record.levelname
  30. if self.use_color and levelname in COLORS:
  31. record.msg = str(colored().names[COLORS[levelname]](record.msg))
  32. # Very ugly, but have to make sure processName is supported
  33. # by foreign logger instances.
  34. # (processName is always supported by Python 2.7)
  35. if "processName" not in record.__dict__:
  36. record.__dict__["processName"] = current_process()._name
  37. return logging.Formatter.format(self, record)
  38. def get_task_logger(loglevel=None, name=None):
  39. logger = logging.getLogger(name or "celery.task.default")
  40. if loglevel is not None:
  41. logger.setLevel(loglevel)
  42. return logger
  43. def setup_logging_subsystem(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  44. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  45. **kwargs):
  46. global _setup
  47. if not _setup:
  48. try:
  49. mputil._logger = None
  50. except AttributeError:
  51. pass
  52. ensure_process_aware_logger()
  53. logging.Logger.manager.loggerDict.clear()
  54. receivers = signals.setup_logging.send(sender=None,
  55. loglevel=loglevel,
  56. logfile=logfile,
  57. format=format,
  58. colorize=colorize)
  59. if not receivers:
  60. root = logging.getLogger()
  61. mp = mputil.get_logger()
  62. for logger in (root, mp):
  63. _setup_logger(logger, logfile, format, colorize, **kwargs)
  64. logger.setLevel(loglevel)
  65. _setup = True
  66. return receivers
  67. def _detect_handler(logfile=None):
  68. """Create log handler with either a filename, an open stream
  69. or ``None`` (stderr)."""
  70. if not logfile or hasattr(logfile, "write"):
  71. return logging.StreamHandler(logfile)
  72. return logging.FileHandler(logfile)
  73. def get_default_logger(loglevel=None, name="celery"):
  74. """Get default logger instance.
  75. :keyword loglevel: Initial log level.
  76. """
  77. logger = logging.getLogger(name)
  78. if loglevel is not None:
  79. logger.setLevel(loglevel)
  80. return logger
  81. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  82. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  83. name="celery", root=True, **kwargs):
  84. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  85. then ``stderr`` is used.
  86. Returns logger object.
  87. """
  88. if not root:
  89. return _setup_logger(get_default_logger(loglevel, name),
  90. logfile, format, colorize, **kwargs)
  91. setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
  92. return get_default_logger(name=name)
  93. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  94. format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  95. task_kwargs=None, **kwargs):
  96. """Setup the task logger. If ``logfile`` is not specified, then
  97. ``stderr`` is used.
  98. Returns logger object.
  99. """
  100. if task_kwargs is None:
  101. task_kwargs = {}
  102. task_kwargs.setdefault("task_id", "-?-")
  103. task_name = task_kwargs.get("task_name")
  104. task_kwargs.setdefault("task_name", "-?-")
  105. logger = _setup_logger(get_task_logger(loglevel, task_name),
  106. logfile, format, colorize, **kwargs)
  107. return LoggerAdapter(logger, task_kwargs)
  108. def _setup_logger(logger, logfile, format, colorize,
  109. formatter=ColorFormatter, **kwargs):
  110. if logger.handlers: # already configured
  111. return logger
  112. handler = _detect_handler(logfile)
  113. handler.setFormatter(formatter(format, use_color=colorize))
  114. logger.addHandler(handler)
  115. return logger
  116. def emergency_error(logfile, message):
  117. """Emergency error logging, for when there's no standard file
  118. descriptors open because the process has been daemonized or for
  119. some other reason."""
  120. closefh = noop
  121. logfile = logfile or sys.__stderr__
  122. if hasattr(logfile, "write"):
  123. logfh = logfile
  124. else:
  125. logfh = open(logfile, "a")
  126. closefh = logfh.close
  127. try:
  128. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  129. "asctime": time.asctime(),
  130. "pid": os.getpid(),
  131. "message": message})
  132. finally:
  133. closefh()
  134. def redirect_stdouts_to_logger(logger, loglevel=None):
  135. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  136. logging instance.
  137. :param logger: The :class:`logging.Logger` instance to redirect to.
  138. :param loglevel: The loglevel redirected messages will be logged as.
  139. """
  140. proxy = LoggingProxy(logger, loglevel)
  141. sys.stdout = sys.stderr = proxy
  142. return proxy
  143. class LoggingProxy(object):
  144. """Forward file object to :class:`logging.Logger` instance.
  145. :param logger: The :class:`logging.Logger` instance to forward to.
  146. :param loglevel: Loglevel to use when writing messages.
  147. """
  148. mode = "w"
  149. name = None
  150. closed = False
  151. loglevel = logging.ERROR
  152. _thread = threading.local()
  153. def __init__(self, logger, loglevel=None):
  154. self.logger = logger
  155. self.loglevel = loglevel or self.logger.level or self.loglevel
  156. self._safewrap_handlers()
  157. def _safewrap_handlers(self):
  158. """Make the logger handlers dump internal errors to
  159. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  160. infinite loops."""
  161. def wrap_handler(handler): # pragma: no cover
  162. class WithSafeHandleError(logging.Handler):
  163. def handleError(self, record):
  164. exc_info = sys.exc_info()
  165. try:
  166. try:
  167. traceback.print_exception(exc_info[0],
  168. exc_info[1],
  169. exc_info[2],
  170. None, sys.__stderr__)
  171. except IOError:
  172. pass # see python issue 5971
  173. finally:
  174. del(exc_info)
  175. handler.handleError = WithSafeHandleError().handleError
  176. return map(wrap_handler, self.logger.handlers)
  177. def write(self, data):
  178. if getattr(self._thread, "recurse_protection", False):
  179. # logger is logging back to this file, so stop recursing.
  180. return
  181. """Write message to logging object."""
  182. data = data.strip()
  183. if data and not self.closed:
  184. self._thread.recurse_protection = True
  185. try:
  186. self.logger.log(self.loglevel, data)
  187. finally:
  188. self._thread.recurse_protection = False
  189. def writelines(self, sequence):
  190. """``writelines(sequence_of_strings) -> None``.
  191. Write the strings to the file.
  192. The sequence can be any iterable object producing strings.
  193. This is equivalent to calling :meth:`write` for each string.
  194. """
  195. map(self.write, sequence)
  196. def flush(self):
  197. """This object is not buffered so any :meth:`flush` requests
  198. are ignored."""
  199. pass
  200. def close(self):
  201. """When the object is closed, no write requests are forwarded to
  202. the logging object anymore."""
  203. self.closed = True
  204. def isatty(self):
  205. """Always returns ``False``. Just here for file support."""
  206. return False
  207. def fileno(self):
  208. return None
  209. class SilenceRepeated(object):
  210. """Only log action every n iterations."""
  211. def __init__(self, action, max_iterations=10):
  212. self.action = action
  213. self.max_iterations = max_iterations
  214. self._iterations = 0
  215. def __call__(self, *msgs):
  216. if self._iterations >= self.max_iterations:
  217. map(self.action, msgs)
  218. self._iterations = 0
  219. else:
  220. self._iterations += 1