log.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. """celery.log"""
  2. import logging
  3. import threading
  4. import time
  5. import os
  6. import sys
  7. import traceback
  8. import types
  9. from multiprocessing import current_process
  10. from multiprocessing import util as mputil
  11. from celery import conf
  12. from celery import signals
  13. from celery.utils import noop, LOG_LEVELS
  14. from celery.utils.compat import LoggerAdapter
  15. from celery.utils.patch import ensure_process_aware_logger
  16. from celery.utils.term import colored
  17. # The logging subsystem is only configured once per process.
  18. # setup_logging_subsystem sets this flag, and subsequent calls
  19. # will do nothing.
  20. _setup = False
  21. COLORS = {"DEBUG": "blue",
  22. "WARNING": "yellow",
  23. "ERROR": "red",
  24. "CRITICAL": "magenta"}
  25. class ColorFormatter(logging.Formatter):
  26. def __init__(self, msg, use_color=True):
  27. logging.Formatter.__init__(self, msg)
  28. self.use_color = use_color
  29. def formatException(self, ei):
  30. r = logging.Formatter.formatException(self, ei)
  31. if type(r) in [types.StringType]:
  32. r = r.decode('utf-8', 'replace') # convert to unicode
  33. return r
  34. def format(self, record):
  35. levelname = record.levelname
  36. if self.use_color and levelname in COLORS:
  37. record.msg = unicode(colored().names[COLORS[levelname]](
  38. record.msg))
  39. # Very ugly, but have to make sure processName is supported
  40. # by foreign logger instances.
  41. # (processName is always supported by Python 2.7)
  42. if "processName" not in record.__dict__:
  43. record.__dict__["processName"] = current_process()._name
  44. t = logging.Formatter.format(self, record)
  45. if type(t) in [types.UnicodeType]:
  46. t = t.encode('utf-8', 'replace')
  47. return t
  48. def get_task_logger(loglevel=None, name=None):
  49. logger = logging.getLogger(name or "celery.task.default")
  50. if loglevel is not None:
  51. logger.setLevel(loglevel)
  52. return logger
  53. def setup_logging_subsystem(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  54. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  55. **kwargs):
  56. global _setup
  57. if not _setup:
  58. try:
  59. mputil._logger = None
  60. except AttributeError:
  61. pass
  62. ensure_process_aware_logger()
  63. logging.Logger.manager.loggerDict.clear()
  64. receivers = signals.setup_logging.send(sender=None,
  65. loglevel=loglevel,
  66. logfile=logfile,
  67. format=format,
  68. colorize=colorize)
  69. if not receivers:
  70. root = logging.getLogger()
  71. if conf.CELERYD_HIJACK_ROOT_LOGGER:
  72. root.handlers = []
  73. mp = mputil.get_logger()
  74. for logger in (root, mp):
  75. _setup_logger(logger, logfile, format, colorize, **kwargs)
  76. logger.setLevel(loglevel)
  77. _setup = True
  78. return receivers
  79. def _detect_handler(logfile=None):
  80. """Create log handler with either a filename, an open stream
  81. or ``None`` (stderr)."""
  82. if logfile is None:
  83. logfile = sys.__stderr__
  84. if not logfile or hasattr(logfile, "write"):
  85. return logging.StreamHandler(logfile)
  86. return logging.FileHandler(logfile)
  87. def get_default_logger(loglevel=None, name="celery"):
  88. """Get default logger instance.
  89. :keyword loglevel: Initial log level.
  90. """
  91. logger = logging.getLogger(name)
  92. if loglevel is not None:
  93. logger.setLevel(loglevel)
  94. return logger
  95. def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  96. format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  97. name="celery", root=True, **kwargs):
  98. """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
  99. then ``stderr`` is used.
  100. Returns logger object.
  101. """
  102. if not root or conf.CELERYD_HIJACK_ROOT_LOGGER:
  103. return _setup_logger(get_default_logger(loglevel, name),
  104. logfile, format, colorize, **kwargs)
  105. setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
  106. return get_default_logger(name=name)
  107. def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
  108. format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
  109. task_kwargs=None, propagate=False, **kwargs):
  110. """Setup the task logger. If ``logfile`` is not specified, then
  111. ``stderr`` is used.
  112. Returns logger object.
  113. """
  114. if task_kwargs is None:
  115. task_kwargs = {}
  116. task_kwargs.setdefault("task_id", "-?-")
  117. task_name = task_kwargs.get("task_name")
  118. task_kwargs.setdefault("task_name", "-?-")
  119. logger = _setup_logger(get_task_logger(loglevel, task_name),
  120. logfile, format, colorize, **kwargs)
  121. logger.propagate = int(propagate) # this is an int for some reason.
  122. # better to just not question why.
  123. return LoggerAdapter(logger, task_kwargs)
  124. def _setup_logger(logger, logfile, format, colorize,
  125. formatter=ColorFormatter, **kwargs):
  126. if logger.handlers: # already configured
  127. return logger
  128. handler = _detect_handler(logfile)
  129. handler.setFormatter(formatter(format, use_color=colorize))
  130. logger.addHandler(handler)
  131. return logger
  132. def emergency_error(logfile, message):
  133. """Emergency error logging, for when there's no standard file
  134. descriptors open because the process has been daemonized or for
  135. some other reason."""
  136. closefh = noop
  137. logfile = logfile or sys.__stderr__
  138. if hasattr(logfile, "write"):
  139. logfh = logfile
  140. else:
  141. logfh = open(logfile, "a")
  142. closefh = logfh.close
  143. try:
  144. logfh.write("[%(asctime)s: CRITICAL/%(pid)d]: %(message)s\n" % {
  145. "asctime": time.asctime(),
  146. "pid": os.getpid(),
  147. "message": message})
  148. finally:
  149. closefh()
  150. def redirect_stdouts_to_logger(logger, loglevel=None):
  151. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to a
  152. logging instance.
  153. :param logger: The :class:`logging.Logger` instance to redirect to.
  154. :param loglevel: The loglevel redirected messages will be logged as.
  155. """
  156. proxy = LoggingProxy(logger, loglevel)
  157. sys.stdout = sys.stderr = proxy
  158. return proxy
  159. class LoggingProxy(object):
  160. """Forward file object to :class:`logging.Logger` instance.
  161. :param logger: The :class:`logging.Logger` instance to forward to.
  162. :param loglevel: Loglevel to use when writing messages.
  163. """
  164. mode = "w"
  165. name = None
  166. closed = False
  167. loglevel = logging.ERROR
  168. _thread = threading.local()
  169. def __init__(self, logger, loglevel=None):
  170. self.logger = logger
  171. self.loglevel = loglevel or self.logger.level or self.loglevel
  172. if not isinstance(self.loglevel, int):
  173. self.loglevel = LOG_LEVELS[self.loglevel.upper()]
  174. self._safewrap_handlers()
  175. def _safewrap_handlers(self):
  176. """Make the logger handlers dump internal errors to
  177. ``sys.__stderr__`` instead of ``sys.stderr`` to circumvent
  178. infinite loops."""
  179. def wrap_handler(handler): # pragma: no cover
  180. class WithSafeHandleError(logging.Handler):
  181. def handleError(self, record):
  182. exc_info = sys.exc_info()
  183. try:
  184. try:
  185. traceback.print_exception(exc_info[0],
  186. exc_info[1],
  187. exc_info[2],
  188. None, sys.__stderr__)
  189. except IOError:
  190. pass # see python issue 5971
  191. finally:
  192. del(exc_info)
  193. handler.handleError = WithSafeHandleError().handleError
  194. return map(wrap_handler, self.logger.handlers)
  195. def write(self, data):
  196. if getattr(self._thread, "recurse_protection", False):
  197. # logger is logging back to this file, so stop recursing.
  198. return
  199. """Write message to logging object."""
  200. data = data.strip()
  201. if data and not self.closed:
  202. self._thread.recurse_protection = True
  203. try:
  204. self.logger.log(self.loglevel, data)
  205. finally:
  206. self._thread.recurse_protection = False
  207. def writelines(self, sequence):
  208. """``writelines(sequence_of_strings) -> None``.
  209. Write the strings to the file.
  210. The sequence can be any iterable object producing strings.
  211. This is equivalent to calling :meth:`write` for each string.
  212. """
  213. map(self.write, sequence)
  214. def flush(self):
  215. """This object is not buffered so any :meth:`flush` requests
  216. are ignored."""
  217. pass
  218. def close(self):
  219. """When the object is closed, no write requests are forwarded to
  220. the logging object anymore."""
  221. self.closed = True
  222. def isatty(self):
  223. """Always returns ``False``. Just here for file support."""
  224. return False
  225. def fileno(self):
  226. return None
  227. class SilenceRepeated(object):
  228. """Only log action every n iterations."""
  229. def __init__(self, action, max_iterations=10):
  230. self.action = action
  231. self.max_iterations = max_iterations
  232. self._iterations = 0
  233. def __call__(self, *msgs):
  234. if self._iterations >= self.max_iterations:
  235. map(self.action, msgs)
  236. self._iterations = 0
  237. else:
  238. self._iterations += 1