|
@@ -41,7 +41,6 @@ from billiard.queues import _SimpleQueue
|
|
|
from kombu.async import READ, WRITE, ERR
|
|
|
from kombu.serialization import pickle as _pickle
|
|
|
from kombu.utils import fxrange
|
|
|
-from kombu.utils.compat import get_errno
|
|
|
from kombu.utils.eventio import SELECT_BAD_FD
|
|
|
from celery.five import Counter, items, values
|
|
|
from celery.utils.log import get_logger
|
|
@@ -139,14 +138,14 @@ def _select(readers=None, writers=None, err=None, timeout=0):
|
|
|
r = list(set(r) | set(e))
|
|
|
return r, w, 0
|
|
|
except (select.error, socket.error) as exc:
|
|
|
- if get_errno(exc) == errno.EINTR:
|
|
|
+ if exc.errno == errno.EINTR:
|
|
|
return [], [], 1
|
|
|
- elif get_errno(exc) in SELECT_BAD_FD:
|
|
|
+ elif exc.errno in SELECT_BAD_FD:
|
|
|
for fd in readers | writers | err:
|
|
|
try:
|
|
|
select.select([fd], [], [], 0)
|
|
|
except (select.error, socket.error) as exc:
|
|
|
- if get_errno(exc) not in SELECT_BAD_FD:
|
|
|
+ if exc.errno not in SELECT_BAD_FD:
|
|
|
raise
|
|
|
readers.discard(fd)
|
|
|
writers.discard(fd)
|
|
@@ -196,7 +195,7 @@ class ResultHandler(_pool.ResultHandler):
|
|
|
fd, bufv[Hr:] if readcanbuf else bufv, 4 - Hr,
|
|
|
)
|
|
|
except OSError as exc:
|
|
|
- if get_errno(exc) not in UNAVAIL:
|
|
|
+ if exc.errno not in UNAVAIL:
|
|
|
raise
|
|
|
yield
|
|
|
else:
|
|
@@ -218,7 +217,7 @@ class ResultHandler(_pool.ResultHandler):
|
|
|
fd, bufv[Br:] if readcanbuf else bufv, body_size - Br,
|
|
|
)
|
|
|
except OSError as exc:
|
|
|
- if get_errno(exc) not in UNAVAIL:
|
|
|
+ if exc.errno not in UNAVAIL:
|
|
|
raise
|
|
|
yield
|
|
|
else:
|
|
@@ -722,7 +721,7 @@ class AsynPool(_pool.Pool):
|
|
|
except StopIteration:
|
|
|
pass
|
|
|
except OSError as exc:
|
|
|
- if get_errno(exc) != errno.EBADF:
|
|
|
+ if exc.errno != errno.EBADF:
|
|
|
raise
|
|
|
else:
|
|
|
add_writer(ready_fd, cor)
|
|
@@ -765,7 +764,7 @@ class AsynPool(_pool.Pool):
|
|
|
try:
|
|
|
Hw += send(header, Hw)
|
|
|
except Exception as exc:
|
|
|
- if get_errno(exc) not in UNAVAIL:
|
|
|
+ if getattr(exc, 'errno', None) not in UNAVAIL:
|
|
|
raise
|
|
|
# suspend until more data
|
|
|
errors += 1
|
|
@@ -781,7 +780,7 @@ class AsynPool(_pool.Pool):
|
|
|
try:
|
|
|
Bw += send(body, Bw)
|
|
|
except Exception as exc:
|
|
|
- if get_errno(exc) not in UNAVAIL:
|
|
|
+ if getattr(exc, 'errno', None) not in UNAVAIL:
|
|
|
raise
|
|
|
# suspend until more data
|
|
|
errors += 1
|
|
@@ -830,7 +829,7 @@ class AsynPool(_pool.Pool):
|
|
|
try:
|
|
|
Hw += send(header, Hw)
|
|
|
except Exception as exc:
|
|
|
- if get_errno(exc) not in UNAVAIL:
|
|
|
+ if getattr(exc, 'errno', None) not in UNAVAIL:
|
|
|
raise
|
|
|
yield
|
|
|
|
|
@@ -839,7 +838,7 @@ class AsynPool(_pool.Pool):
|
|
|
try:
|
|
|
Bw += send(body, Bw)
|
|
|
except Exception as exc:
|
|
|
- if get_errno(exc) not in UNAVAIL:
|
|
|
+ if getattr(exc, 'errno', None) not in UNAVAIL:
|
|
|
raise
|
|
|
# suspend until more data
|
|
|
yield
|
|
@@ -1041,7 +1040,7 @@ class AsynPool(_pool.Pool):
|
|
|
try:
|
|
|
proc.inq.put(None)
|
|
|
except OSError as exc:
|
|
|
- if get_errno(exc) != errno.EBADF:
|
|
|
+ if exc.errno != errno.EBADF:
|
|
|
raise
|
|
|
|
|
|
def create_result_handler(self):
|
|
@@ -1092,14 +1091,12 @@ class AsynPool(_pool.Pool):
|
|
|
try:
|
|
|
task = resq.recv()
|
|
|
except (OSError, IOError, EOFError) as exc:
|
|
|
- if get_errno(exc) == errno.EINTR:
|
|
|
+ _errno = getattr(exc, 'errno', None)
|
|
|
+ if _errno == errno.EINTR:
|
|
|
continue
|
|
|
- elif get_errno(exc) == errno.EAGAIN:
|
|
|
+ elif _errno == errno.EAGAIN:
|
|
|
break
|
|
|
- else:
|
|
|
- debug('got %r while flushing process %r',
|
|
|
- exc, proc, exc_info=1)
|
|
|
- if get_errno(exc) not in UNAVAIL:
|
|
|
+ elif _errno not in UNAVAIL:
|
|
|
debug('got %r while flushing process %r',
|
|
|
exc, proc, exc_info=1)
|
|
|
break
|