| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 | from __future__ import absolute_import, unicode_literalsimport loggingimport pytestimport sysfrom collections import defaultdictfrom io import StringIOfrom tempfile import mktempfrom case import Mock, mock, patch, skipfrom case.utils import get_logger_handlersfrom celery import signalsfrom celery import uuidfrom celery.app.log import TaskFormatterfrom celery.five import python_2_unicode_compatiblefrom celery.utils.log import LoggingProxyfrom celery.utils.log import (    get_logger,    ColorFormatter,    logger as base_logger,    get_task_logger,    task_logger,    in_sighandler,    logger_isa,)class test_TaskFormatter:    def test_no_task(self):        class Record(object):            msg = 'hello world'            levelname = 'info'            exc_text = exc_info = None            stack_info = None            def getMessage(self):                return self.msg        record = Record()        x = TaskFormatter()        x.format(record)        assert record.task_name == '???'        assert record.task_id == '???'class test_logger_isa:    def test_isa(self):        x = get_task_logger('Z1george')        assert logger_isa(x, task_logger)        prev_x, x.parent = x.parent, None        try:            assert not logger_isa(x, task_logger)        finally:            x.parent = prev_x        y = get_task_logger('Z1elaine')        y.parent = x        assert logger_isa(y, task_logger)        assert logger_isa(y, x)        assert logger_isa(y, y)        z = get_task_logger('Z1jerry')        z.parent = y        assert logger_isa(z, task_logger)        assert logger_isa(z, y)        assert logger_isa(z, x)        assert logger_isa(z, z)    def test_recursive(self):        x = get_task_logger('X1foo')        prev, x.parent = x.parent, x        try:            with pytest.raises(RuntimeError):                logger_isa(x, task_logger)        finally:            x.parent = prev        y = get_task_logger('X2foo')        z = get_task_logger('X2foo')        prev_y, y.parent = y.parent, z        try:            prev_z, z.parent = z.parent, y            try:                with pytest.raises(RuntimeError):                    logger_isa(y, task_logger)            finally:                z.parent = prev_z        finally:            y.parent = prev_yclass test_ColorFormatter:    @patch('celery.utils.log.safe_str')    @patch('logging.Formatter.formatException')    def test_formatException_not_string(self, fe, safe_str):        x = ColorFormatter()        value = KeyError()        fe.return_value = value        assert x.formatException(value) is value        fe.assert_called()        safe_str.assert_not_called()    @patch('logging.Formatter.formatException')    @patch('celery.utils.log.safe_str')    def test_formatException_bytes(self, safe_str, fe):        x = ColorFormatter()        fe.return_value = b'HELLO'        try:            raise Exception()        except Exception:            assert x.formatException(sys.exc_info())        if sys.version_info[0] == 2:            safe_str.assert_called()    @patch('logging.Formatter.format')    def test_format_object(self, _format):        x = ColorFormatter()        x.use_color = True        record = Mock()        record.levelname = 'ERROR'        record.msg = object()        assert x.format(record)    @patch('celery.utils.log.safe_str')    def test_format_raises(self, safe_str):        x = ColorFormatter()        def on_safe_str(s):            try:                raise ValueError('foo')            finally:                safe_str.side_effect = None        safe_str.side_effect = on_safe_str        @python_2_unicode_compatible        class Record(object):            levelname = 'ERROR'            msg = 'HELLO'            exc_info = 1            exc_text = 'error text'            stack_info = None            def __str__(self):                return on_safe_str('')            def getMessage(self):                return self.msg        record = Record()        safe_str.return_value = record        msg = x.format(record)        assert '<Unrepresentable' in msg        assert safe_str.call_count == 1    @skip.if_python3()    @patch('celery.utils.log.safe_str')    def test_format_raises_no_color(self, safe_str):        x = ColorFormatter(use_color=False)        record = Mock()        record.levelname = 'ERROR'        record.msg = 'HELLO'        record.exc_text = 'error text'        x.format(record)        assert safe_str.call_count == 1class test_default_logger:    def setup(self):        self.setup_logger = self.app.log.setup_logger        self.get_logger = lambda n=None: get_logger(n) if n else logging.root        signals.setup_logging.receivers[:] = []        self.app.log.already_setup = False    def test_get_logger_sets_parent(self):        logger = get_logger('celery.test_get_logger')        assert logger.parent.name == base_logger.name    def test_get_logger_root(self):        logger = get_logger(base_logger.name)        assert logger.parent is logging.root    @mock.restore_logging()    def test_setup_logging_subsystem_misc(self):        self.app.log.setup_logging_subsystem(loglevel=None)    @mock.restore_logging()    def test_setup_logging_subsystem_misc2(self):        self.app.conf.worker_hijack_root_logger = True        self.app.log.setup_logging_subsystem()    def test_get_default_logger(self):        assert self.app.log.get_default_logger()    def test_configure_logger(self):        logger = self.app.log.get_default_logger()        self.app.log._configure_logger(logger, sys.stderr, None, '', False)        self.app.log._configure_logger(None, sys.stderr, None, '', False)        logger.handlers[:] = []    @mock.restore_logging()    def test_setup_logging_subsystem_colorize(self):        self.app.log.setup_logging_subsystem(colorize=None)        self.app.log.setup_logging_subsystem(colorize=True)    @mock.restore_logging()    def test_setup_logging_subsystem_no_mputil(self):        with mock.mask_modules('billiard.util'):            self.app.log.setup_logging_subsystem()    @mock.restore_logging()    def test_setup_logger(self):        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,                                   root=False, colorize=True)        logger.handlers = []        self.app.log.already_setup = False        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,                                   root=False, colorize=None)        # setup_logger logs to stderr without logfile argument.        assert (get_logger_handlers(logger)[0].stream is                sys.__stderr__)    @mock.restore_logging()    def test_setup_logger_no_handlers_stream(self):        l = self.get_logger()        l.handlers = []        with mock.stdouts() as outs:            stdout, stderr = outs            l = self.setup_logger(logfile=sys.stderr,                                  loglevel=logging.INFO, root=False)            l.info('The quick brown fox...')            assert 'The quick brown fox...' in stderr.getvalue()    @patch('os.fstat')    def test_setup_logger_no_handlers_file(self, *args):        tempfile = mktemp(suffix='unittest', prefix='celery')        _open = ('builtins.open' if sys.version_info[0] == 3                 else '__builtin__.open')        with patch(_open) as osopen:            with mock.restore_logging():                files = defaultdict(StringIO)                def open_file(filename, *args, **kwargs):                    f = files[filename]                    f.fileno = Mock()                    f.fileno.return_value = 99                    return f                osopen.side_effect = open_file                l = self.get_logger()                l.handlers = []                l = self.setup_logger(                    logfile=tempfile, loglevel=logging.INFO, root=False,                )                assert isinstance(get_logger_handlers(l)[0],                                  logging.FileHandler)                assert tempfile in files    @mock.restore_logging()    def test_redirect_stdouts(self):        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,                                   root=False)        try:            with mock.wrap_logger(logger) as sio:                self.app.log.redirect_stdouts_to_logger(                    logger, loglevel=logging.ERROR,                )                logger.error('foo')                assert 'foo' in sio.getvalue()                self.app.log.redirect_stdouts_to_logger(                    logger, stdout=False, stderr=False,                )        finally:            sys.stdout, sys.stderr = sys.__stdout__, sys.__stderr__    @mock.restore_logging()    def test_logging_proxy(self):        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,                                   root=False)        with mock.wrap_logger(logger) as sio:            p = LoggingProxy(logger, loglevel=logging.ERROR)            p.close()            p.write('foo')            assert 'foo' not in sio.getvalue()            p.closed = False            p.write('foo')            assert 'foo' in sio.getvalue()            lines = ['baz', 'xuzzy']            p.writelines(lines)            for line in lines:                assert line in sio.getvalue()            p.flush()            p.close()            assert not p.isatty()            with mock.stdouts() as (stdout, stderr):                with in_sighandler():                    p.write('foo')                    assert stderr.getvalue()    @mock.restore_logging()    def test_logging_proxy_recurse_protection(self):        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,                                   root=False)        p = LoggingProxy(logger, loglevel=logging.ERROR)        p._thread.recurse_protection = True        try:            assert p.write('FOOFO') is None        finally:            p._thread.recurse_protection = Falseclass test_task_logger(test_default_logger):    def setup(self):        logger = self.logger = get_logger('celery.task')        logger.handlers = []        logging.root.manager.loggerDict.pop(logger.name, None)        self.uid = uuid()        @self.app.task(shared=False)        def test_task():            pass        self.get_logger().handlers = []        self.task = test_task        from celery._state import _task_stack        _task_stack.push(test_task)    def teardown(self):        from celery._state import _task_stack        _task_stack.pop()    def setup_logger(self, *args, **kwargs):        return self.app.log.setup_task_loggers(*args, **kwargs)    def get_logger(self, *args, **kwargs):        return get_task_logger('test_task_logger')    def test_renaming_base_logger(self):        with pytest.raises(RuntimeError):            get_task_logger('celery')    def test_renaming_task_logger(self):        with pytest.raises(RuntimeError):            get_task_logger('celery.task')class MockLogger(logging.Logger):    _records = None    def __init__(self, *args, **kwargs):        self._records = []        logging.Logger.__init__(self, *args, **kwargs)    def handle(self, record):        self._records.append(record)    def isEnabledFor(self, level):        return True
 |