log.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # -*- coding: utf-8 -*-
  2. """Logging configuration.
  3. The Celery instances logging section: ``Celery.log``.
  4. Sets up logging for the worker and other programs,
  5. redirects standard outs, colors log output, patches logging
  6. related compatibility fixes, and so on.
  7. """
  8. from __future__ import absolute_import, unicode_literals
  9. import logging
  10. import os
  11. import sys
  12. from logging.handlers import WatchedFileHandler
  13. from kombu.utils.encoding import set_default_encoding_file
  14. from celery import signals
  15. from celery._state import get_current_task
  16. from celery.five import string_t
  17. from celery.local import class_property
  18. from celery.platforms import isatty
  19. from celery.utils.log import (
  20. get_logger, mlevel,
  21. ColorFormatter, LoggingProxy, get_multiprocessing_logger,
  22. reset_multiprocessing_logger,
  23. )
  24. from celery.utils.nodenames import node_format
  25. from celery.utils.term import colored
  26. __all__ = ['TaskFormatter', 'Logging']
  27. MP_LOG = os.environ.get('MP_LOG', False)
  28. class TaskFormatter(ColorFormatter):
  29. """Formatter for tasks, adding the task name and id."""
  30. def format(self, record):
  31. task = get_current_task()
  32. if task and task.request:
  33. record.__dict__.update(task_id=task.request.id,
  34. task_name=task.name)
  35. else:
  36. record.__dict__.setdefault('task_name', '???')
  37. record.__dict__.setdefault('task_id', '???')
  38. return ColorFormatter.format(self, record)
  39. class Logging(object):
  40. """Application logging setup (app.log)."""
  41. #: The logging subsystem is only configured once per process.
  42. #: setup_logging_subsystem sets this flag, and subsequent calls
  43. #: will do nothing.
  44. _setup = False
  45. def __init__(self, app):
  46. self.app = app
  47. self.loglevel = mlevel(logging.WARN)
  48. self.format = self.app.conf.worker_log_format
  49. self.task_format = self.app.conf.worker_task_log_format
  50. self.colorize = self.app.conf.worker_log_color
  51. def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
  52. redirect_level='WARNING', colorize=None, hostname=None):
  53. loglevel = mlevel(loglevel)
  54. handled = self.setup_logging_subsystem(
  55. loglevel, logfile, colorize=colorize, hostname=hostname,
  56. )
  57. if not handled:
  58. if redirect_stdouts:
  59. self.redirect_stdouts(redirect_level)
  60. os.environ.update(
  61. CELERY_LOG_LEVEL=str(loglevel) if loglevel else '',
  62. CELERY_LOG_FILE=str(logfile) if logfile else '',
  63. )
  64. return handled
  65. def redirect_stdouts(self, loglevel=None, name='celery.redirected'):
  66. self.redirect_stdouts_to_logger(
  67. get_logger(name), loglevel=loglevel
  68. )
  69. os.environ.update(
  70. CELERY_LOG_REDIRECT='1',
  71. CELERY_LOG_REDIRECT_LEVEL=str(loglevel or ''),
  72. )
  73. def setup_logging_subsystem(self, loglevel=None, logfile=None, format=None,
  74. colorize=None, hostname=None, **kwargs):
  75. if self.already_setup:
  76. return
  77. if logfile and hostname:
  78. logfile = node_format(logfile, hostname)
  79. Logging._setup = True
  80. loglevel = mlevel(loglevel or self.loglevel)
  81. format = format or self.format
  82. colorize = self.supports_color(colorize, logfile)
  83. reset_multiprocessing_logger()
  84. receivers = signals.setup_logging.send(
  85. sender=None, loglevel=loglevel, logfile=logfile,
  86. format=format, colorize=colorize,
  87. )
  88. if not receivers:
  89. root = logging.getLogger()
  90. if self.app.conf.worker_hijack_root_logger:
  91. root.handlers = []
  92. get_logger('celery').handlers = []
  93. get_logger('celery.task').handlers = []
  94. get_logger('celery.redirected').handlers = []
  95. # Configure root logger
  96. self._configure_logger(
  97. root, logfile, loglevel, format, colorize, **kwargs
  98. )
  99. # Configure the multiprocessing logger
  100. self._configure_logger(
  101. get_multiprocessing_logger(),
  102. logfile, loglevel if MP_LOG else logging.ERROR,
  103. format, colorize, **kwargs
  104. )
  105. signals.after_setup_logger.send(
  106. sender=None, logger=root,
  107. loglevel=loglevel, logfile=logfile,
  108. format=format, colorize=colorize,
  109. )
  110. # then setup the root task logger.
  111. self.setup_task_loggers(loglevel, logfile, colorize=colorize)
  112. try:
  113. stream = logging.getLogger().handlers[0].stream
  114. except (AttributeError, IndexError):
  115. pass
  116. else:
  117. set_default_encoding_file(stream)
  118. # This is a hack for multiprocessing's fork+exec, so that
  119. # logging before Process.run works.
  120. logfile_name = logfile if isinstance(logfile, string_t) else ''
  121. os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
  122. _MP_FORK_LOGFILE_=logfile_name,
  123. _MP_FORK_LOGFORMAT_=format)
  124. return receivers
  125. def _configure_logger(self, logger, logfile, loglevel,
  126. format, colorize, **kwargs):
  127. if logger is not None:
  128. self.setup_handlers(logger, logfile, format,
  129. colorize, **kwargs)
  130. if loglevel:
  131. logger.setLevel(loglevel)
  132. def setup_task_loggers(self, loglevel=None, logfile=None, format=None,
  133. colorize=None, propagate=False, **kwargs):
  134. """Setup the task logger.
  135. If `logfile` is not specified, then `sys.stderr` is used.
  136. Will return the base task logger object.
  137. """
  138. loglevel = mlevel(loglevel or self.loglevel)
  139. format = format or self.task_format
  140. colorize = self.supports_color(colorize, logfile)
  141. logger = self.setup_handlers(
  142. get_logger('celery.task'),
  143. logfile, format, colorize,
  144. formatter=TaskFormatter, **kwargs
  145. )
  146. logger.setLevel(loglevel)
  147. # this is an int for some reason, better to not question why.
  148. logger.propagate = int(propagate)
  149. signals.after_setup_task_logger.send(
  150. sender=None, logger=logger,
  151. loglevel=loglevel, logfile=logfile,
  152. format=format, colorize=colorize,
  153. )
  154. return logger
  155. def redirect_stdouts_to_logger(self, logger, loglevel=None,
  156. stdout=True, stderr=True):
  157. """Redirect :class:`sys.stdout` and :class:`sys.stderr` to logger.
  158. Arguments:
  159. logger (logging.Logger): Logger instance to redirect to.
  160. loglevel (int, str): The loglevel redirected message
  161. will be logged as.
  162. """
  163. proxy = LoggingProxy(logger, loglevel)
  164. if stdout:
  165. sys.stdout = proxy
  166. if stderr:
  167. sys.stderr = proxy
  168. return proxy
  169. def supports_color(self, colorize=None, logfile=None):
  170. colorize = self.colorize if colorize is None else colorize
  171. if self.app.IS_WINDOWS:
  172. # Windows does not support ANSI color codes.
  173. return False
  174. if colorize or colorize is None:
  175. # Only use color if there's no active log file
  176. # and stderr is an actual terminal.
  177. return logfile is None and isatty(sys.stderr)
  178. return colorize
  179. def colored(self, logfile=None, enabled=None):
  180. return colored(enabled=self.supports_color(enabled, logfile))
  181. def setup_handlers(self, logger, logfile, format, colorize,
  182. formatter=ColorFormatter, **kwargs):
  183. if self._is_configured(logger):
  184. return logger
  185. handler = self._detect_handler(logfile)
  186. handler.setFormatter(formatter(format, use_color=colorize))
  187. logger.addHandler(handler)
  188. return logger
  189. def _detect_handler(self, logfile=None):
  190. """Create handler from filename, an open stream or `None` (stderr)."""
  191. logfile = sys.__stderr__ if logfile is None else logfile
  192. if hasattr(logfile, 'write'):
  193. return logging.StreamHandler(logfile)
  194. return WatchedFileHandler(logfile)
  195. def _has_handler(self, logger):
  196. return any(
  197. not isinstance(h, logging.NullHandler)
  198. for h in logger.handlers or []
  199. )
  200. def _is_configured(self, logger):
  201. return self._has_handler(logger) and not getattr(
  202. logger, '_rudimentary_setup', False)
  203. def setup_logger(self, name='celery', *args, **kwargs):
  204. """Deprecated: No longer used."""
  205. self.setup_logging_subsystem(*args, **kwargs)
  206. return logging.root
  207. def get_default_logger(self, name='celery', **kwargs):
  208. return get_logger(name)
  209. @class_property
  210. def already_setup(self):
  211. return self._setup
  212. @already_setup.setter # noqa
  213. def already_setup(self, was_setup):
  214. self._setup = was_setup