from __future__ import absolute_import, unicode_literals import logging import sys from collections import defaultdict from io import StringIO from tempfile import mktemp import pytest from case import Mock, mock, patch, skip from case.utils import get_logger_handlers from celery import signals, uuid from celery.app.log import TaskFormatter from celery.five import python_2_unicode_compatible from celery.utils.log import (ColorFormatter, LoggingProxy, get_logger, get_task_logger, in_sighandler) from celery.utils.log import logger as base_logger from celery.utils.log import logger_isa, task_logger class test_TaskFormatter: def test_no_task(self): class Record(object): msg = 'hello world' levelname = 'info' exc_text = exc_info = None stack_info = None def getMessage(self): return self.msg record = Record() x = TaskFormatter() x.format(record) assert record.task_name == '???' assert record.task_id == '???' class test_logger_isa: def test_isa(self): x = get_task_logger('Z1george') assert logger_isa(x, task_logger) prev_x, x.parent = x.parent, None try: assert not logger_isa(x, task_logger) finally: x.parent = prev_x y = get_task_logger('Z1elaine') y.parent = x assert logger_isa(y, task_logger) assert logger_isa(y, x) assert logger_isa(y, y) z = get_task_logger('Z1jerry') z.parent = y assert logger_isa(z, task_logger) assert logger_isa(z, y) assert logger_isa(z, x) assert logger_isa(z, z) def test_recursive(self): x = get_task_logger('X1foo') prev, x.parent = x.parent, x try: with pytest.raises(RuntimeError): logger_isa(x, task_logger) finally: x.parent = prev y = get_task_logger('X2foo') z = get_task_logger('X2foo') prev_y, y.parent = y.parent, z try: prev_z, z.parent = z.parent, y try: with pytest.raises(RuntimeError): logger_isa(y, task_logger) finally: z.parent = prev_z finally: y.parent = prev_y class test_ColorFormatter: @patch('celery.utils.log.safe_str') @patch('logging.Formatter.formatException') def test_formatException_not_string(self, fe, safe_str): x = ColorFormatter() value = KeyError() fe.return_value = value assert x.formatException(value) is value fe.assert_called() safe_str.assert_not_called() @patch('logging.Formatter.formatException') @patch('celery.utils.log.safe_str') def test_formatException_bytes(self, safe_str, fe): x = ColorFormatter() fe.return_value = b'HELLO' try: raise Exception() except Exception: assert x.formatException(sys.exc_info()) if sys.version_info[0] == 2: safe_str.assert_called() @patch('logging.Formatter.format') def test_format_object(self, _format): x = ColorFormatter() x.use_color = True record = Mock() record.levelname = 'ERROR' record.msg = object() assert x.format(record) @patch('celery.utils.log.safe_str') def test_format_raises(self, safe_str): x = ColorFormatter() def on_safe_str(s): try: raise ValueError('foo') finally: safe_str.side_effect = None safe_str.side_effect = on_safe_str @python_2_unicode_compatible class Record(object): levelname = 'ERROR' msg = 'HELLO' exc_info = 1 exc_text = 'error text' stack_info = None def __str__(self): return on_safe_str('') def getMessage(self): return self.msg record = Record() safe_str.return_value = record msg = x.format(record) assert '