|
@@ -31,7 +31,7 @@ from collections import deque, namedtuple
|
|
|
from io import BytesIO
|
|
|
from pickle import HIGHEST_PROTOCOL
|
|
|
from time import sleep
|
|
|
-from weakref import WeakSet, WeakValueDictionary, ref
|
|
|
+from weakref import WeakValueDictionary, ref
|
|
|
|
|
|
from amqp.utils import promise
|
|
|
from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
|
|
@@ -78,13 +78,7 @@ except (ImportError, NameError): # pragma: no cover
|
|
|
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
-crit, error, warn, info, debug = (
|
|
|
- logger.critical, logger.error, logger.warn, logger.info, logger.debug,
|
|
|
-)
|
|
|
-
|
|
|
-DD_ALERT_CHANGE = """\
|
|
|
-defcon {level}: tasks not accepted since {s}s: {tasks}\
|
|
|
-"""
|
|
|
+error, debug = logger.error, logger.debug
|
|
|
|
|
|
UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR])
|
|
|
|
|
@@ -104,11 +98,7 @@ Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
|
|
|
|
|
|
def gen_not_started(gen):
|
|
|
|
|
|
- return gen.gi_frame is not None and gen.gi_frame.f_lasti == -1
|
|
|
-
|
|
|
-
|
|
|
-def gen_stopped(gen):
|
|
|
- return gen.gi_frame is None
|
|
|
+ return gen.gi_frame and gen.gi_frame.f_lasti == -1
|
|
|
|
|
|
|
|
|
def _get_job_writer(job):
|
|
@@ -165,51 +155,6 @@ def _select(readers=None, writers=None, err=None, timeout=0):
|
|
|
raise
|
|
|
|
|
|
|
|
|
-class deadlock_detection(object):
|
|
|
-
|
|
|
- def __init__(self, pool, interval=5.0):
|
|
|
- self.pool = pool
|
|
|
- self.interval = interval
|
|
|
-
|
|
|
- def register_with_event_loop(self, hub):
|
|
|
- hub.call_repeatedly((self.interval * 1.25) * 5.0, self.checkpoint, hub)
|
|
|
-
|
|
|
- def is_waiting(self, job):
|
|
|
- if not (job._accepted or job.ready()):
|
|
|
- writer = _get_job_writer(job)
|
|
|
- if not writer or gen_stopped(writer):
|
|
|
- return True
|
|
|
- return False
|
|
|
-
|
|
|
- def _waiting_for_ack(self, jobs):
|
|
|
- is_waiting = self.is_waiting
|
|
|
- return (job for job in jobs if is_waiting(job))
|
|
|
-
|
|
|
- def alert(self, severity, level, jobs):
|
|
|
- severity(DD_ALERT_CHANGE.format(
|
|
|
- level=level, s=(5 - level) * self.interval,
|
|
|
- jobs=', '.join(job.correlation_id for job in jobs),
|
|
|
- ))
|
|
|
-
|
|
|
- def checkpoint(self, hub):
|
|
|
- self.defcon(5, hub, values(self.pool._cache))
|
|
|
-
|
|
|
- def defcon(self, level, hub, jobs):
|
|
|
- jobs = WeakSet(self._waiting_for_ack(jobs))
|
|
|
- if jobs:
|
|
|
- if level <= 1:
|
|
|
- self.alert(crit, level, jobs)
|
|
|
- raise MemoryError(
|
|
|
- 'Possible deadlock detected, contact support.')
|
|
|
- elif level <= 3:
|
|
|
- self.alert(warn, level, jobs)
|
|
|
- elif level <= 4:
|
|
|
- self.alert(info, level, jobs)
|
|
|
- else:
|
|
|
- self.alert(debug, level, jobs)
|
|
|
- hub.call_later(self.interval, self.defcon, level - 1, jobs)
|
|
|
-
|
|
|
-
|
|
|
class Worker(_pool.Worker):
|
|
|
"""Pool worker process."""
|
|
|
dead = False
|
|
@@ -455,9 +400,6 @@ class AsynPool(_pool.Pool):
|
|
|
self._create_process_handlers(hub)
|
|
|
self._create_write_handlers(hub)
|
|
|
|
|
|
- dd = deadlock_detection(self)
|
|
|
- dd.register_with_event_loop(hub)
|
|
|
-
|
|
|
|
|
|
[hub.add_reader(fd, self._event_process_exit, hub, fd)
|
|
|
for fd in self.process_sentinels]
|