Browse Source

unpack_from only supports memoryview in 2.7.6. Closes #1637

Ask Solem 11 years ago
parent
commit
574f6a545a
1 changed files with 14 additions and 3 deletions
  1. 14 3
      celery/concurrency/asynpool.py

+ 14 - 3
celery/concurrency/asynpool.py

@@ -24,6 +24,7 @@ import random
 import select
 import socket
 import struct
+import sys
 import time
 
 from collections import deque, namedtuple
@@ -53,7 +54,17 @@ try:
     from struct import unpack_from as _unpack_from
     memoryview = memoryview
     readcanbuf = True
+
+    if sys.version_info[0] == 2 and sys.version_info < (2, 7, 6):
+
+        def unpack_from(fmt, view, _unpack_from=_unpack_from):  # noqa
+            return _unpack_from(fmt, view.tobytes())  # <- memoryview
+    else:
+        # unpack_from supports memoryview in 2.7.6 and 3.3+
+        unpack_from = _unpack_from  # noqa
+
 except (ImportError, NameError):  # pragma: no cover
+    raise
 
     def __read__(fd, buf, size, read=os.read):  # noqa
         chunk = read(fd, size)
@@ -63,8 +74,8 @@ except (ImportError, NameError):  # pragma: no cover
         return n
     readcanbuf = False  # noqa
 
-    def _unpack_from(fmt, buf, unpack=struct.unpack):  # noqa
-        return unpack(fmt, buf.getvalue())
+    def unpack_from(fmt, iobuf, unpack=struct.unpack):  # noqa
+        return unpack(fmt, iobuf.getvalue())  # <-- BytesIO
 
 
 logger = get_logger(__name__)
@@ -173,7 +184,7 @@ class ResultHandler(_pool.ResultHandler):
 
     def _recv_message(self, add_reader, fd, callback,
                       __read__=__read__, readcanbuf=readcanbuf,
-                      BytesIO=BytesIO, unpack_from=_unpack_from,
+                      BytesIO=BytesIO, unpack_from=unpack_from,
                       load=_pickle.load):
         Hr = Br = 0
         if readcanbuf: