Browse Source

Use _multiprocessing.read + memoryview if available

Ask Solem 11 years ago
parent
commit
3b0e47a5b9
1 changed files with 56 additions and 24 deletions
  1. 56 24
      celery/concurrency/asynpool.py

+ 56 - 24
celery/concurrency/asynpool.py

@@ -48,6 +48,25 @@ from celery.utils.log import get_logger
 from celery.utils.text import truncate
 from celery.worker import state as worker_state
 
+try:
+    from _billiard import read as __read__
+    from struct import unpack_from as _unpack_from
+    memoryview = memoryview
+    readcanbuf = True
+except (ImportError, NameError):  # pragma: no cover
+
+    def __read__(fd, buf, size, read=os.read):  # noqa
+        chunk = read(fd, size)
+        n = len(chunk)
+        if n != 0:
+            buf.write(chunk)
+        return n
+    readcanbuf = False  # noqa
+
+    def _unpack_from(fmt, buf, unpack=struct.unpack):  # noqa
+        return unpack(fmt, buf.getvalue())
+
+
 logger = get_logger(__name__)
 error, debug = logger.error, logger.debug
 
@@ -152,48 +171,61 @@ class ResultHandler(_pool.ResultHandler):
         self.state_handlers[WORKER_UP] = self.on_process_alive
 
     def _recv_message(self, add_reader, fd, callback,
-                      read=os.read, unpack=struct.unpack,
-                      loads=_pickle.loads, BytesIO=BytesIO):
-        buf = BytesIO()
+                      __read__=__read__, readcanbuf=readcanbuf,
+                      BytesIO=BytesIO, unpack_from=_unpack_from,
+                      load=_pickle.load):
+        Hr = Br = 0
+        if readcanbuf:
+            buf = bytearray(4)
+            bufv = memoryview(buf)
+        else:
+            buf = bufv = BytesIO()
         # header
-        remaining = 4
-        bsize = None
         assert not isblocking(fd)
-        while remaining > 0:
+
+        while Hr < 4:
             try:
-                bsize = read(fd, remaining)
+                n = __read__(
+                    fd, bufv[Hr:] if readcanbuf else bufv, 4 - Hr,
+                )
             except OSError as exc:
                 if get_errno(exc) not in UNAVAIL:
                     raise
                 yield
             else:
-                n = len(bsize)
                 if n == 0:
-                    if remaining == 4:
-                        raise EOFError()
-                    else:
-                        raise OSError("Got end of file during message")
-                remaining -= n
+                    raise (OSError('End of file during message') if Hr
+                           else EOFError())
+                Hr += n
+
+        body_size, = unpack_from('>i', bufv)
+        if readcanbuf:
+            buf = bytearray(body_size)
+            bufv = memoryview(buf)
+        else:
+            buf = bufv = BytesIO()
+
 
-        remaining, = size, = unpack('>i', bsize)
-        while remaining > 0:
+        while Br < body_size:
             try:
-                chunk = read(fd, remaining)
+                n = __read__(
+                    fd, bufv[Br:] if readcanbuf else bufv, body_size - Br,
+                )
             except OSError as exc:
                 if get_errno(exc) not in UNAVAIL:
                     raise
                 yield
             else:
-                n = len(chunk)
                 if n == 0:
-                    if remaining == size:
-                        raise EOFError()
-                    else:
-                        raise IOError('Got end of file during message')
-                buf.write(chunk)
-                remaining -= n
+                    raise (OSError('End of file during message') if Br
+                           else EOFError())
+                Br += n
         add_reader(fd, self.handle_event, fd)
-        message = loads(buf.getvalue())
+        if readcanbuf:
+            message = load(BytesIO(bufv))
+        else:
+            bufv.seek(0)
+            message = load(bufv)
         if message:
             callback(message)