Browse Source

Fixes 'no handlers for multiprocessing' warning on Windows and fork+exec

Ask Solem 13 years ago
parent
commit
df9eda7ee5

+ 3 - 7
celery/apps/worker.py

@@ -166,13 +166,9 @@ class Worker(configurated):
             self.loader.import_from_cwd(module)
 
     def redirect_stdouts_to_logger(self):
-        handled = self.app.log.setup_logging_subsystem(loglevel=self.loglevel,
-                                                       logfile=self.logfile)
-        if not handled:
-            logger = self.app.log.get_default_logger()
-            if self.redirect_stdouts:
-                self.app.log.redirect_stdouts_to_logger(logger,
-                                loglevel=self.redirect_stdouts_level)
+        self.app.log.setup(self.loglevel, self.logfile,
+                           self.redirect_stdouts,
+                           self.redirect_stdouts_level)
 
     def purge_messages(self):
         count = self.app.control.discard_all()

+ 1 - 0
celery/bin/celeryd.py

@@ -73,6 +73,7 @@
 """
 from __future__ import absolute_import
 
+import os
 import sys
 
 try:

+ 5 - 0
celery/concurrency/processes/__init__.py

@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 from __future__ import absolute_import
 
+import os
 import platform
 import signal as _signal
 
@@ -40,6 +41,10 @@ def process_initializer(app, hostname):
     # This is for Windows and other platforms not supporting
     # fork(). Note that init_worker makes sure it's only
     # run once per process.
+    app.log.setup(int(os.environ.get("CELERY_LOG_LEVEL", 0)),
+                  os.environ.get("CELERY_LOG_FILE") or None,
+                  bool(os.environ.get("CELERY_LOG_REDIRECT", False)),
+                  str(os.environ.get("CELERY_LOG_REDIRECT_LEVEL")))
     app.loader.init_worker()
     app.loader.init_worker_process()
     signals.worker_process_init.send(sender=None)

+ 22 - 3
celery/concurrency/processes/forking.py

@@ -125,6 +125,28 @@ from celery.concurrency.processes.forking import main; main()"""
         current_process()._inheriting = True
         preparation_data = load(from_parent)
         _forking.prepare(preparation_data)
+
+        # Huge hack to make logging before Process.run work.
+        loglevel = os.environ.get("_MP_FORK_LOGLEVEL_")
+        logfile = os.environ.get("_MP_FORK_LOGFILE_") or None
+        format = os.environ.get("_MP_FORK_LOGFORMAT_")
+        if loglevel:
+            from multiprocessing import util
+            import logging
+            logger = util.get_logger()
+            logger.setLevel(int(loglevel))
+            if not logger.handlers:
+                logger._rudimentary_setup = True
+                logfile = logfile or sys.__stderr__
+                if hasattr(logfile, "write"):
+                    handler = logging.StreamHandler(logfile)
+                else:
+                    handler = logging.FileHandler(logfile)
+                formatter = logging.Formatter(
+                        format or util.DEFAULT_LOGGING_FORMAT)
+                handler.setFormatter(formatter)
+                logger.addHandler(handler)
+
         self = load(from_parent)
         current_process()._inheriting = False
 
@@ -163,6 +185,3 @@ from celery.concurrency.processes.forking import main; main()"""
     _forking.Popen = Popen
 else:
     from multiprocessing.forking import freeze_support
-
-
-

+ 27 - 2
celery/log.py

@@ -3,6 +3,7 @@ from __future__ import absolute_import
 
 import logging
 import threading
+import os
 import sys
 import traceback
 
@@ -130,10 +131,31 @@ class Logging(object):
                 signals.after_setup_logger.send(sender=None, logger=logger,
                                         loglevel=loglevel, logfile=logfile,
                                         format=format, colorize=colorize)
+
+        # This is a hack for multiprocessing's fork+exec, so that
+        # logging before Process.run works.
+        os.environ.update(_MP_FORK_LOGLEVEL_=str(loglevel),
+                          _MP_FORK_LOGFILE_=logfile or "",
+                          _MP_FORK_LOGFORMAT_=format)
         Logging._setup = True
 
         return receivers
 
+    def setup(self, loglevel=None, logfile=None, redirect_stdouts=False,
+            redirect_level="WARNING"):
+        handled = self.setup_logging_subsystem(loglevel=loglevel,
+                                               logfile=logfile)
+        if not handled:
+            logger = self.get_default_logger()
+            if redirect_stdouts:
+                self.redirect_stdouts_to_logger(logger,
+                                loglevel=redirect_level)
+        os.environ.update(
+            CELERY_LOG_LEVEL=str(loglevel) if loglevel else "",
+            CELERY_LOG_FILE=str(logfile) if logfile else "",
+            CELERY_LOG_REDIRECT="1" if redirect_stdouts else "",
+            CELERY_LOG_REDIRECT_LEVEL=str(redirect_level))
+
     def _detect_handler(self, logfile=None):
         """Create log handler with either a filename, an open stream
         or :const:`None` (stderr)."""
@@ -216,10 +238,13 @@ class Logging(object):
             sys.stderr = proxy
         return proxy
 
+    def _is_configured(self, logger):
+        return logger.handlers and not getattr(
+                logger, "_rudimentary_setup", False)
+
     def _setup_logger(self, logger, logfile, format, colorize,
             formatter=ColorFormatter, **kwargs):
-
-        if logger.handlers:  # Logger already configured
+        if self._is_configured(logger):
             return logger
 
         handler = self._detect_handler(logfile)