Kaynağa Gözat

Now requires billiard 3.4b1

Ask Solem 10 yıl önce
ebeveyn
işleme
0a9fd7e214
2 değiştirilmiş dosya ile 7 ekleme ve 47 silme
  1. 5 1
      celery/concurrency/asynpool.py
  2. 2 46
      celery/platforms.py

+ 5 - 1
celery/concurrency/asynpool.py

@@ -179,7 +179,6 @@ def _select(readers=None, writers=None, err=None, timeout=0,
 
 class Worker(_pool.Worker):
     """Pool worker process."""
-    dead = False
 
     def on_loop_start(self, pid):
         # our version sends a WORKER_UP message when the process is ready
@@ -351,6 +350,11 @@ class AsynPool(_pool.Pool):
     ResultHandler = ResultHandler
     Worker = Worker
 
+    def WorkerProcess(self, worker):
+        worker = super(AsynPool, self).WorkerProcess(worker)
+        worker.dead = False
+        return worker
+
     def __init__(self, processes=None, synack=False,
                  sched_strategy=None, *args, **kwargs):
         self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,

+ 2 - 46
celery/platforms.py

@@ -25,14 +25,14 @@ try:
     from billiard.process import current_process
 except ImportError:
     current_process = None
+from billiard.compat import get_fdmax, close_open_fds
 # fileno used to be in this module
 from kombu.utils import maybe_fileno
 from kombu.utils.encoding import safe_str
 from contextlib import contextmanager
 
 from .local import try_import
-from .five import items, range, reraise, string_t, zip_longest
-from .utils.functional import uniq
+from .five import items, reraise, string_t
 
 _setproctitle = try_import('setproctitle')
 resource = try_import('resource')
@@ -110,26 +110,6 @@ class LockFailed(Exception):
     """Raised if a pidlock can't be acquired."""
 
 
-def get_fdmax(default=None):
-    """Return the maximum number of open file descriptors
-    on this system.
-
-    :keyword default: Value returned if there's no file
-                      descriptor limit.
-
-    """
-    try:
-        return os.sysconf('SC_OPEN_MAX')
-    except:
-        pass
-    if resource is None:  # Windows
-        return default
-    fdmax = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
-    if fdmax == resource.RLIM_INFINITY:
-        return default
-    return fdmax
-
-
 class Pidfile(object):
     """Pidfile
 
@@ -268,30 +248,6 @@ def _create_pidlock(pidfile):
     return pidlock
 
 
-if hasattr(os, 'closerange'):
-
-    def close_open_fds(keep=None):
-        # must make sure this is 0-inclusive (Issue #1882)
-        keep = list(uniq(sorted(
-            f for f in map(maybe_fileno, keep or []) if f is not None
-        )))
-        maxfd = get_fdmax(default=2048)
-        kL, kH = iter([-1] + keep), iter(keep + [maxfd])
-        for low, high in zip_longest(kL, kH):
-            if low + 1 != high:
-                os.closerange(low + 1, high)
-
-else:
-
-    def close_open_fds(keep=None):  # noqa
-        keep = [maybe_fileno(f)
-                for f in (keep or []) if maybe_fileno(f) is not None]
-        for fd in reversed(range(get_fdmax(default=2048))):
-            if fd not in keep:
-                with ignore_errno(errno.EBADF):
-                    os.close(fd)
-
-
 class DaemonContext(object):
     _is_open = False