ソースを参照

Pool: select api is not the same as poll (Issue #2430)

Ask Solem 10 年 前
コミット
3dd71214ea
1 ファイル変更37 行追加26 行削除
  1. 37 26
      celery/concurrency/asynpool.py

+ 37 - 26
celery/concurrency/asynpool.py

@@ -110,10 +110,43 @@ def _get_job_writer(job):
         return writer()  # is a weakref
 
 
+if hasattr(select, 'poll', None):
+    def _select_imp(readers=None, writers=None, err=None, timeout=0,
+                    poll=select.poll, POLLIN=select.POLLIN,
+                    POLLOUT=select.POLLOUT, POLLERR=select.POLLERR):
+        poller = poll()
+        register = poller.register
+
+        if readers:
+            [register(fd, POLLIN) for fd in readers]
+        if writers:
+            [register(fd, POLLOUT) for fd in writers]
+        if err:
+            [register(fd, POLLERR) for fd in err]
+
+        R, W = set(), set()
+        timeout = 0 if timeout and timeout < 0 else round(timeout * 1e3)
+        events = poller.poll(timeout)
+        for fd, event in events:
+            if not isinstance(fd, Integral):
+                fd = fd.fileno()
+            if event & POLLIN:
+                R.add(fd)
+            if event & POLLOUT:
+                W.add(fd)
+            if event & POLLERR:
+                R.add(fd)
+        return R, W, 0
+else:
+    def _select_imp(readers=None, writers=None, err=None, timeout=0):
+        r, w, e = select.select(readers, writers, err, timeout)
+        if e:
+            r = list(set(r) | set(e))
+        return r, w, 0
+
+
 def _select(readers=None, writers=None, err=None, timeout=0,
-            poll=getattr(select, 'poll', select.select),
-            POLLIN=select.POLLIN, POLLOUT=select.POLLOUT,
-            POLLERR=select.POLLERR):
+            _select_imp=_select_imp):
     """Simple wrapper to :class:`~select.select`, using :`~select.poll`
     as the implementation.
 
@@ -136,30 +169,8 @@ def _select(readers=None, writers=None, err=None, timeout=0,
     readers = set() if readers is None else readers
     writers = set() if writers is None else writers
     err = set() if err is None else err
-    poller = poll()
-    register = poller.register
-
-    if readers:
-        [register(fd, POLLIN) for fd in readers]
-    if writers:
-        [register(fd, POLLOUT) for fd in writers]
-    if err:
-        [register(fd, POLLERR) for fd in err]
-
-    R, W = set(), set()
-    timeout = 0 if timeout and timeout < 0 else round(timeout * 1e3)
     try:
-        events = poller.poll(timeout)
-        for fd, event in events:
-            if not isinstance(fd, Integral):
-                fd = fd.fileno()
-            if event & POLLIN:
-                R.add(fd)
-            if event & POLLOUT:
-                W.add(fd)
-            if event & POLLERR:
-                R.add(fd)
-        return R, W, 0
+        return _select_imp(readers, writers, err, timeout)
     except (select.error, socket.error) as exc:
         if exc.errno == errno.EINTR:
             return set(), set(), 1