Browse Source

Changed the way logging is configured.

By default we now configure the root logger, also we don't hijack
the multiprocessing logger anymore, but instead use our own logger name
("celery".)

Users can choose to configure logging by subscribing to the
``setup_logging`` signal::

    from celery import signals

    def setup_logging(**kwargs):
        from logging.config import fileConfig
        fileConfig("logging.conf")
    signals.setup_logging.connect(setup_logging)

If there are no receivers for this signal, the logging subsystem
will be configured using the --loglevel/--logfile argument, this will
be used for *all defined loggers*, also stdout+stderr will be redirected
to the celery logger, if you want to manually configure logging *and* redirect
stdouts, you need to enable this manually::

    def setup_logging(**kwargs):
        import logging
        from logging.config import fileConfig
        from celery import log
        fileConfig("logging.conf")
        stdouts = logging.getLogger("mystdoutslogger")
        log.redirect_stdouts_to_logger(stdouts, loglevel=logging.WARNING)
Ask Solem 14 years ago
parent
commit
1c0eb185e7
5 changed files with 73 additions and 48 deletions
  1. 6 4
      celery/bin/celeryd.py
  2. 48 30
      celery/log.py
  3. 3 0
      celery/signals.py
  4. 15 8
      celery/tests/test_log.py
  5. 1 6
      celery/worker/__init__.py

+ 6 - 4
celery/bin/celeryd.py

@@ -199,6 +199,7 @@ class Worker(object):
     def run(self):
         self.init_loader()
         self.init_queues()
+        self.worker_init()
         self.redirect_stdouts_to_logger()
         print("celery@%s v%s is starting." % (self.hostname,
                                               celery.__version__))
@@ -209,7 +210,6 @@ class Worker(object):
 
         if self.discard:
             self.purge_messages()
-        self.worker_init()
 
         # Dump configuration to screen so we have some basic information
         # for when users sends bug reports.
@@ -245,10 +245,12 @@ class Worker(object):
 
     def redirect_stdouts_to_logger(self):
         from celery import log
+        handled = log.setup_logging_subsystem(loglevel=self.loglevel,
+                                              logfile=self.logfile)
         # Redirect stdout/stderr to our logger.
-        logger = log.setup_logger(loglevel=self.loglevel,
-                                  logfile=self.logfile)
-        log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
+        if not handled:
+            logger = log.get_default_logger()
+            log.redirect_stdouts_to_logger(logger, loglevel=logging.WARNING)
 
     def purge_messages(self):
         discarded_count = discard_all()

+ 48 - 30
celery/log.py

@@ -7,12 +7,15 @@ import sys
 import traceback
 
 from celery import conf
+from celery import signals
 from celery.utils import noop
 from celery.utils.compat import LoggerAdapter
 from celery.utils.patch import ensure_process_aware_logger
 
-_hijacked = False
-_monkeypatched = False
+# The logging subsystem is only configured once per process.
+# setup_logging_subsystem sets this flag, and subsequent calls
+# will do nothing.
+_setup = False
 
 BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
 RESET_SEQ = "\033[0m"
@@ -25,6 +28,7 @@ COLORS = {"DEBUG": BLUE,
 
 
 class ColorFormatter(logging.Formatter):
+
     def __init__(self, msg, use_color=True):
         logging.Formatter.__init__(self, msg)
         self.use_color = use_color
@@ -38,32 +42,37 @@ class ColorFormatter(logging.Formatter):
 
 
 def get_task_logger(loglevel=None, name=None):
-    ensure_process_aware_logger()
     logger = logging.getLogger(name or "celery.task.default")
     if loglevel is not None:
         logger.setLevel(loglevel)
     return logger
 
 
-def _hijack_multiprocessing_logger():
-    from multiprocessing import util as mputil
-    global _hijacked
-
-    if _hijacked:
-        return mputil.get_logger()
-
-    ensure_process_aware_logger()
-
-    logging.Logger.manager.loggerDict.clear()
-
-    try:
-        if mputil._logger is not None:
-            mputil.logger = None
-    except AttributeError:
-        pass
-
-    _hijacked = True
-    return mputil.get_logger()
+def setup_logging_subsystem(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
+        format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
+        **kwargs):
+    global _setup
+    if not _setup:
+        print("SETTING LOGGER TO %s" % (logfile, ))
+        ensure_process_aware_logger()
+        logging.Logger.manager.loggerDict.clear()
+        from multiprocessing import util as mputil
+        try:
+            if mputil._logger is not None:
+                mputil.logger = None
+        except AttributeError:
+            pass
+        receivers = signals.setup_logging.send(sender=None,
+                                               loglevel=loglevel,
+                                               logfile=logfile,
+                                               format=format,
+                                               colorize=colorize)
+        if not receivers:
+            root = logging.getLogger()
+            _setup_logger(root, logfile, loglevel, format, colorize, **kwargs)
+            root.setLevel(loglevel)
+        _setup = True
+        return receivers
 
 
 def _detect_handler(logfile=None):
@@ -74,13 +83,13 @@ def _detect_handler(logfile=None):
     return logging.FileHandler(logfile)
 
 
-def get_default_logger(loglevel=None):
+def get_default_logger(loglevel=None, name="celery"):
     """Get default logger instance.
 
     :keyword loglevel: Initial log level.
 
     """
-    logger = _hijack_multiprocessing_logger()
+    logger = logging.getLogger(name)
     if loglevel is not None:
         logger.setLevel(loglevel)
     return logger
@@ -88,20 +97,23 @@ def get_default_logger(loglevel=None):
 
 def setup_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
         format=conf.CELERYD_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
-        **kwargs):
+        name="celery", root=True, **kwargs):
     """Setup the ``multiprocessing`` logger. If ``logfile`` is not specified,
     then ``stderr`` is used.
 
     Returns logger object.
 
     """
-    return _setup_logger(get_default_logger(loglevel),
-                         logfile, format, colorize, **kwargs)
+    if not root:
+        return _setup_logger(get_default_logger(loglevel, name),
+                             logfile, format, colorize, **kwargs)
+    setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
+    return get_default_logger(name=name)
 
 
 def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
         format=conf.CELERYD_TASK_LOG_FORMAT, colorize=conf.CELERYD_LOG_COLOR,
-        task_kwargs=None, **kwargs):
+        task_kwargs=None, root=True, **kwargs):
     """Setup the task logger. If ``logfile`` is not specified, then
     ``stderr`` is used.
 
@@ -113,11 +125,17 @@ def setup_task_logger(loglevel=conf.CELERYD_LOG_LEVEL, logfile=None,
     task_kwargs.setdefault("task_id", "-?-")
     task_name = task_kwargs.get("task_name")
     task_kwargs.setdefault("task_name", "-?-")
-    logger = _setup_logger(get_task_logger(loglevel, task_name),
-                           logfile, format, colorize, **kwargs)
+    if not root:
+        logger = _setup_logger(get_task_logger(loglevel, task_name),
+                               logfile, format, colorize, **kwargs)
+    else:
+        setup_logging_subsystem(loglevel, logfile, format, colorize, **kwargs)
+        logger = get_task_logger(name=task_name)
     return LoggerAdapter(logger, task_kwargs)
 
 
+
+
 def _setup_logger(logger, logfile, format, colorize,
         formatter=ColorFormatter, **kwargs):
 

+ 3 - 0
celery/signals.py

@@ -14,3 +14,6 @@ worker_init = Signal(providing_args=[])
 worker_process_init = Signal(providing_args=[])
 worker_ready = Signal(providing_args=[])
 worker_shutdown = Signal(providing_args=[])
+
+setup_logging = Signal(providing_args=["loglevel", "logfile",
+                                       "format", "colorize"])

+ 15 - 8
celery/tests/test_log.py

@@ -14,6 +14,7 @@ except ImportError:
 
 from carrot.utils import rpartition
 
+from celery import log
 from celery.log import (setup_logger, setup_task_logger, emergency_error,
                         get_default_logger, get_task_logger,
                         redirect_stdouts_to_logger, LoggingProxy)
@@ -50,6 +51,7 @@ class test_default_logger(unittest.TestCase):
     def setUp(self):
         self.setup_logger = setup_logger
         self.get_logger = get_default_logger
+        log._setup = False
 
     def _assertLog(self, logger, logmsg, loglevel=logging.ERROR):
 
@@ -69,9 +71,11 @@ class test_default_logger(unittest.TestCase):
         return self.assertFalse(val, reason)
 
     def test_setup_logger(self):
-        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None)
+        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,
+                                   root=False)
         set_handlers(logger, [])
-        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None)
+        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,
+                                   root=False)
         self.assertIs(get_handlers(logger)[0].stream, sys.__stderr__,
                 "setup_logger logs to stderr without logfile argument.")
         self.assertDidLogFalse(logger, "Logging something",
@@ -90,7 +94,8 @@ class test_default_logger(unittest.TestCase):
 
         def with_override_stdouts(outs):
             stdout, stderr = outs
-            l = self.setup_logger(logfile=stderr, loglevel=logging.INFO)
+            l = self.setup_logger(logfile=stderr, loglevel=logging.INFO,
+                                  root=False)
             l.info("The quick brown fox...")
             self.assertIn("The quick brown fox...", stderr.getvalue())
 
@@ -101,9 +106,9 @@ class test_default_logger(unittest.TestCase):
         l = self.get_logger()
         set_handlers(l, [])
         tempfile = mktemp(suffix="unittest", prefix="celery")
-        l = self.setup_logger(logfile=tempfile, loglevel=0)
-        print(get_handlers(l)[0].stream)
-        self.assertIsInstance(get_handlers(l)[0], logging.FileHandler)
+        l = self.setup_logger(logfile=tempfile, loglevel=0, root=False)
+        self.assertIsInstance(get_handlers(l)[0 ],
+                              logging.FileHandler)
 
     def test_emergency_error_stderr(self):
         def with_override_stdouts(outs):
@@ -126,7 +131,8 @@ class test_default_logger(unittest.TestCase):
             os.unlink(tempfile)
 
     def test_redirect_stdouts(self):
-        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None)
+        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,
+                                   root=False)
         try:
             def with_wrap_logger(sio):
                 redirect_stdouts_to_logger(logger, loglevel=logging.ERROR)
@@ -139,7 +145,8 @@ class test_default_logger(unittest.TestCase):
             sys.stdout, sys.stderr = sys.__stdout__, sys.__stderr__
 
     def test_logging_proxy(self):
-        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None)
+        logger = self.setup_logger(loglevel=logging.ERROR, logfile=None,
+                                   root=False)
 
         def with_wrap_logger(sio):
             p = LoggingProxy(logger)

+ 1 - 6
celery/worker/__init__.py

@@ -12,7 +12,7 @@ from celery import conf
 from celery import registry
 from celery import platform
 from celery import signals
-from celery.log import setup_logger, _hijack_multiprocessing_logger
+from celery.log import setup_logger
 from celery.beat import EmbeddedClockService
 from celery.utils import noop, instantiate
 
@@ -37,11 +37,6 @@ def process_initializer():
     Used for multiprocessing environments.
 
     """
-    # There seems to a bug in multiprocessing (backport?)
-    # when detached, where the worker gets EOFErrors from time to time
-    # and the logger is left from the parent process causing a crash.
-    _hijack_multiprocessing_logger()
-
     map(platform.reset_signal, WORKER_SIGRESET)
     map(platform.ignore_signal, WORKER_SIGIGNORE)
     platform.set_mp_process_title("celeryd")