Browse Source

close_open_fds uses os.closerange, get_fdmax uses sysconf

Ask Solem 11 years ago
parent
commit
b1e2744652
3 changed files with 78 additions and 8 deletions
  1. 2 2
      celery/beat.py
  2. 59 6
      celery/platforms.py
  3. 17 0
      celery/utils/log.py

+ 2 - 2
celery/beat.py

@@ -28,7 +28,7 @@ from .five import items, reraise, values, monotonic
 from .schedules import maybe_schedule, crontab
 from .utils.imports import instantiate
 from .utils.timeutils import humanize_seconds
-from .utils.log import get_logger
+from .utils.log import get_logger, iter_open_logger_fds
 
 __all__ = ['SchedulingError', 'ScheduleEntry', 'Scheduler',
            'PersistentScheduler', 'Service', 'EmbeddedService']
@@ -504,7 +504,7 @@ else:
             platforms.signals.reset('SIGTERM')
             platforms.close_open_fds([
                 sys.__stdin__, sys.__stdout__, sys.__stderr__,
-            ])
+            ] + list(iter_open_logger_fds()))
             self.service.start(embedded_process=True)
 
         def stop(self):

+ 59 - 6
celery/platforms.py

@@ -17,6 +17,9 @@ import platform as _platform
 import signal as _signal
 import sys
 
+from collections import namedtuple
+from itertools import izip_longest
+
 from billiard import current_process
 # fileno used to be in this module
 from kombu.utils import maybe_fileno
@@ -26,6 +29,7 @@ from contextlib import contextmanager
 
 from .local import try_import
 from .five import items, range, reraise, string_t
+from .utils.functional import uniq
 
 _setproctitle = try_import('setproctitle')
 resource = try_import('resource')
@@ -59,6 +63,39 @@ PIDFILE_MODE = ((os.R_OK | os.W_OK) << 6) | ((os.R_OK) << 3) | ((os.R_OK))
 PIDLOCKED = """ERROR: Pidfile ({0}) already exists.
 Seems we're already running? (pid: {1})"""
 
+_range = namedtuple('_range', ('start', 'stop'))
+
+
+def ranges(sequence, end):
+    """From a sequence of numbers return a list of ranges
+    avoiding those numbers.
+
+    Note: List must be sorted and uniq.
+
+    Useful for :func:`os.closerange` to provide a list
+    of file descriptors to keep:
+
+        >>> list(ranges([], end=2048))
+        _range(start=0, stop=2048)]
+
+        >>> list(ranges([1, 2, 3, 30, 1400], end=2048))
+        [_range(start=0, stop=1), _range(start=4, stop=30),
+         _range(start=31, stop=256)]
+
+        >>> list(ranges([1, 2, 3, 30, 1400], end=256))
+        [_range(start=0, stop=1), _range(start=4, stop=30),
+         _range(start=31, stop=1400), _range(start=1401, stop=2048)]
+
+    """
+    k, l = iter([-1] + sequence), iter(sequence)
+    for prev, stop in izip_longest(k, l):
+        prev = min(prev + 1, end)
+        if prev != stop:
+            if stop > end:
+                yield _range(prev, end)
+                break
+            yield _range(prev, end if stop is None else stop)
+
 
 def pyimplementation():
     """Return string identifying the current Python implementation."""
@@ -87,6 +124,10 @@ def get_fdmax(default=None):
                       descriptor limit.
 
     """
+    try:
+        return os.sysconf('SC_OPEN_MAX')
+    except:
+        pass
     if resource is None:  # Windows
         return default
     fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
@@ -232,12 +273,24 @@ def _create_pidlock(pidfile):
     return pidlock
 
 
-def close_open_fds(keep=None):
-    keep = [maybe_fileno(f) for f in keep if maybe_fileno(f)] if keep else []
-    for fd in reversed(range(get_fdmax(default=2048))):
-        if fd not in keep:
-            with ignore_errno(errno.EBADF):
-                os.close(fd)
+if hasattr(os, 'closerange'):
+
+    def close_open_fds(keep):
+        keep = [maybe_fileno(f)
+                for f in uniq(sorted((keep or [])))
+                if maybe_fileno(f) is not None]
+        for low, high in ranges(keep, get_fdmax(default=2048)):
+            os.closerange(low, high)
+
+else:
+
+    def close_open_fds(keep=None):  # noqa
+        keep = [maybe_fileno(f)
+                for f in (keep or []) if maybe_fileno(f) is not None]
+        for fd in reversed(range(get_fdmax(default=2048))):
+            if fd not in keep:
+                with ignore_errno(errno.EBADF):
+                    os.close(fd)
 
 
 class DaemonContext(object):

+ 17 - 0
celery/utils/log.py

@@ -16,6 +16,7 @@ import traceback
 
 from contextlib import contextmanager
 from billiard import current_process, util as mputil
+from kombu.five import values
 from kombu.log import get_logger as _get_logger, LOG_LEVELS
 from kombu.utils.encoding import safe_str
 
@@ -50,6 +51,22 @@ def set_in_sighandler(value):
     _in_sighandler = value
 
 
+def iter_open_logger_fds():
+    seen = set()
+    loggers = logging.Logger.manager.loggerDict
+    for logger in values(logging.Logger.manager.loggerDict):
+        try:
+            for handler in logger.handlers:
+                try:
+                    if handler not in seen:
+                        yield handler.stream
+                        seen.add(handler)
+                except AttributeError:
+                    pass
+        except AttributeError:  # PlaceHolder does not have handlers
+            pass
+
+
 @contextmanager
 def in_sighandler():
     set_in_sighandler(True)