|
@@ -73,6 +73,7 @@ except (ImportError, NameError): # pragma: no cover
|
|
def unpack_from(fmt, iobuf, unpack=struct.unpack): # noqa
|
|
def unpack_from(fmt, iobuf, unpack=struct.unpack): # noqa
|
|
return unpack(fmt, iobuf.getvalue()) # <-- BytesIO
|
|
return unpack(fmt, iobuf.getvalue()) # <-- BytesIO
|
|
|
|
|
|
|
|
+__all__ = ['AsynPool']
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
logger = get_logger(__name__)
|
|
error, debug = logger.error, logger.debug
|
|
error, debug = logger.error, logger.debug
|
|
@@ -100,6 +101,7 @@ Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
|
|
|
|
|
|
|
|
|
|
def gen_not_started(gen):
|
|
def gen_not_started(gen):
|
|
|
|
+ """Return true if generator is not started."""
|
|
# gi_frame is None when generator stopped.
|
|
# gi_frame is None when generator stopped.
|
|
return gen.gi_frame and gen.gi_frame.f_lasti == -1
|
|
return gen.gi_frame and gen.gi_frame.f_lasti == -1
|
|
|
|
|
|
@@ -150,8 +152,7 @@ else:
|
|
|
|
|
|
def _select(readers=None, writers=None, err=None, timeout=0,
|
|
def _select(readers=None, writers=None, err=None, timeout=0,
|
|
poll=_select_imp):
|
|
poll=_select_imp):
|
|
- """Simple wrapper to :class:`~select.select`, using :`~select.poll`
|
|
|
|
- as the implementation.
|
|
|
|
|
|
+ """Simple wrapper to :class:`~select.select`, using :`~select.poll`.
|
|
|
|
|
|
Arguments:
|
|
Arguments:
|
|
readers (Set[Fd]): Set of reader fds to test if readable.
|
|
readers (Set[Fd]): Set of reader fds to test if readable.
|
|
@@ -271,8 +272,7 @@ class ResultHandler(_pool.ResultHandler):
|
|
callback(message)
|
|
callback(message)
|
|
|
|
|
|
def _make_process_result(self, hub):
|
|
def _make_process_result(self, hub):
|
|
- """Coroutine that reads messages from the pool processes
|
|
|
|
- and calls the appropriate handler."""
|
|
|
|
|
|
+ """Coroutine reading messages from the pool processes."""
|
|
fileno_to_outq = self.fileno_to_outq
|
|
fileno_to_outq = self.fileno_to_outq
|
|
on_state_change = self.on_state_change
|
|
on_state_change = self.on_state_change
|
|
add_reader = hub.add_reader
|
|
add_reader = hub.add_reader
|
|
@@ -302,8 +302,7 @@ class ResultHandler(_pool.ResultHandler):
|
|
raise RuntimeError('Not registered with event loop')
|
|
raise RuntimeError('Not registered with event loop')
|
|
|
|
|
|
def on_stop_not_started(self):
|
|
def on_stop_not_started(self):
|
|
- """This method is always used to stop when the helper thread is not
|
|
|
|
- started."""
|
|
|
|
|
|
+ # This is always used, since we do not start any threads.
|
|
cache = self.cache
|
|
cache = self.cache
|
|
check_timeouts = self.check_timeouts
|
|
check_timeouts = self.check_timeouts
|
|
fileno_to_outq = self.fileno_to_outq
|
|
fileno_to_outq = self.fileno_to_outq
|
|
@@ -362,7 +361,8 @@ class ResultHandler(_pool.ResultHandler):
|
|
|
|
|
|
|
|
|
|
class AsynPool(_pool.Pool):
|
|
class AsynPool(_pool.Pool):
|
|
- """Pool version that uses AIO instead of helper threads."""
|
|
|
|
|
|
+ """AsyncIO Pool (no threads)."""
|
|
|
|
+
|
|
ResultHandler = ResultHandler
|
|
ResultHandler = ResultHandler
|
|
Worker = Worker
|
|
Worker = Worker
|
|
|
|
|
|
@@ -456,7 +456,7 @@ class AsynPool(_pool.Pool):
|
|
os.close(fd)
|
|
os.close(fd)
|
|
|
|
|
|
def register_with_event_loop(self, hub):
|
|
def register_with_event_loop(self, hub):
|
|
- """Registers the async pool with the current event loop."""
|
|
|
|
|
|
+ """Register the async pool with the current event loop."""
|
|
self._result_handler.register_with_event_loop(hub)
|
|
self._result_handler.register_with_event_loop(hub)
|
|
self.handle_result_event = self._result_handler.handle_event
|
|
self.handle_result_event = self._result_handler.handle_event
|
|
self._create_timelimit_handlers(hub)
|
|
self._create_timelimit_handlers(hub)
|
|
@@ -478,8 +478,7 @@ class AsynPool(_pool.Pool):
|
|
hub.on_tick.add(self.on_poll_start)
|
|
hub.on_tick.add(self.on_poll_start)
|
|
|
|
|
|
def _create_timelimit_handlers(self, hub, now=time.time):
|
|
def _create_timelimit_handlers(self, hub, now=time.time):
|
|
- """For async pool this sets up the handlers used
|
|
|
|
- to implement time limits."""
|
|
|
|
|
|
+ """Create handlers used to implement time limits."""
|
|
call_later = hub.call_later
|
|
call_later = hub.call_later
|
|
trefs = self._tref_for_id = WeakValueDictionary()
|
|
trefs = self._tref_for_id = WeakValueDictionary()
|
|
|
|
|
|
@@ -540,8 +539,7 @@ class AsynPool(_pool.Pool):
|
|
self._mark_worker_as_available(inqW_fd)
|
|
self._mark_worker_as_available(inqW_fd)
|
|
|
|
|
|
def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
|
|
def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
|
|
- """For async pool this will create the handlers called
|
|
|
|
- when a process is up/down and etc."""
|
|
|
|
|
|
+ """Create handlers called on process up/down, etc."""
|
|
add_reader, remove_reader, remove_writer = (
|
|
add_reader, remove_reader, remove_writer = (
|
|
hub.add_reader, hub.remove_reader, hub.remove_writer,
|
|
hub.add_reader, hub.remove_reader, hub.remove_writer,
|
|
)
|
|
)
|
|
@@ -649,8 +647,7 @@ class AsynPool(_pool.Pool):
|
|
def _create_write_handlers(self, hub,
|
|
def _create_write_handlers(self, hub,
|
|
pack=struct.pack, dumps=_pickle.dumps,
|
|
pack=struct.pack, dumps=_pickle.dumps,
|
|
protocol=HIGHEST_PROTOCOL):
|
|
protocol=HIGHEST_PROTOCOL):
|
|
- """For async pool this creates the handlers used to write data to
|
|
|
|
- child processes."""
|
|
|
|
|
|
+ """Create handlers used to write data to child processes."""
|
|
fileno_to_inq = self._fileno_to_inq
|
|
fileno_to_inq = self._fileno_to_inq
|
|
fileno_to_synq = self._fileno_to_synq
|
|
fileno_to_synq = self._fileno_to_synq
|
|
outbound = self.outbound_buffer
|
|
outbound = self.outbound_buffer
|
|
@@ -1028,8 +1025,7 @@ class AsynPool(_pool.Pool):
|
|
pass
|
|
pass
|
|
|
|
|
|
def create_process_queues(self):
|
|
def create_process_queues(self):
|
|
- """Creates new in, out (and optionally syn) queues,
|
|
|
|
- returned as a tuple."""
|
|
|
|
|
|
+ """Create new in, out, etc. queues, returned as a tuple."""
|
|
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
|
|
# NOTE: Pipes must be set O_NONBLOCK at creation time (the original
|
|
# fd), otherwise it won't be possible to change the flags until
|
|
# fd), otherwise it won't be possible to change the flags until
|
|
# there's an actual reader/writer on the other side.
|
|
# there's an actual reader/writer on the other side.
|
|
@@ -1047,8 +1043,7 @@ class AsynPool(_pool.Pool):
|
|
return inq, outq, synq
|
|
return inq, outq, synq
|
|
|
|
|
|
def on_process_alive(self, pid):
|
|
def on_process_alive(self, pid):
|
|
- """Handler called when the :const:`WORKER_UP` message is received
|
|
|
|
- from a child process.
|
|
|
|
|
|
+ """Called when reciving the :const:`WORKER_UP` message.
|
|
|
|
|
|
Marks the process as ready to receive work.
|
|
Marks the process as ready to receive work.
|
|
"""
|
|
"""
|
|
@@ -1064,8 +1059,7 @@ class AsynPool(_pool.Pool):
|
|
self._all_inqueues.add(proc.inqW_fd)
|
|
self._all_inqueues.add(proc.inqW_fd)
|
|
|
|
|
|
def on_job_process_down(self, job, pid_gone):
|
|
def on_job_process_down(self, job, pid_gone):
|
|
- """Handler called for each job when the process it was assigned to
|
|
|
|
- exits."""
|
|
|
|
|
|
+ """Called for each job when the process assigned to it exits."""
|
|
if job._write_to and not job._write_to._is_alive():
|
|
if job._write_to and not job._write_to._is_alive():
|
|
# job was partially written
|
|
# job was partially written
|
|
self.on_partial_read(job, job._write_to)
|
|
self.on_partial_read(job, job._write_to)
|
|
@@ -1075,9 +1069,12 @@ class AsynPool(_pool.Pool):
|
|
self._put_back(job)
|
|
self._put_back(job)
|
|
|
|
|
|
def on_job_process_lost(self, job, pid, exitcode):
|
|
def on_job_process_lost(self, job, pid, exitcode):
|
|
- """Handler called for each *started* job when the process it
|
|
|
|
|
|
+ """Called when the process executing job' exits.
|
|
|
|
+
|
|
|
|
+ This happens when the process job'
|
|
was assigned to exited by mysterious means (error exitcodes and
|
|
was assigned to exited by mysterious means (error exitcodes and
|
|
- signals)"""
|
|
|
|
|
|
+ signals).
|
|
|
|
+ """
|
|
self.mark_as_worker_lost(job, exitcode)
|
|
self.mark_as_worker_lost(job, exitcode)
|
|
|
|
|
|
def human_write_stats(self):
|
|
def human_write_stats(self):
|
|
@@ -1102,8 +1099,7 @@ class AsynPool(_pool.Pool):
|
|
}
|
|
}
|
|
|
|
|
|
def _process_cleanup_queues(self, proc):
|
|
def _process_cleanup_queues(self, proc):
|
|
- """Handler called to clean up a processes queues after process
|
|
|
|
- exit."""
|
|
|
|
|
|
+ """Called to clean up queues after process exit."""
|
|
if not proc.dead:
|
|
if not proc.dead:
|
|
try:
|
|
try:
|
|
self._queues[self._find_worker_queues(proc)] = None
|
|
self._queues[self._find_worker_queues(proc)] = None
|
|
@@ -1132,8 +1128,7 @@ class AsynPool(_pool.Pool):
|
|
)
|
|
)
|
|
|
|
|
|
def _process_register_queues(self, proc, queues):
|
|
def _process_register_queues(self, proc, queues):
|
|
- """Marks new ownership for ``queues`` so that the fileno indices are
|
|
|
|
- updated."""
|
|
|
|
|
|
+ """Mark new ownership for ``queues`` to update fileno indices."""
|
|
assert queues in self._queues
|
|
assert queues in self._queues
|
|
b = len(self._queues)
|
|
b = len(self._queues)
|
|
self._queues[queues] = proc
|
|
self._queues[queues] = proc
|
|
@@ -1157,7 +1152,9 @@ class AsynPool(_pool.Pool):
|
|
self._quick_put = self._quick_get = self._poll_result = None
|
|
self._quick_put = self._quick_get = self._poll_result = None
|
|
|
|
|
|
def process_flush_queues(self, proc):
|
|
def process_flush_queues(self, proc):
|
|
- """Flushes all queues, including the outbound buffer, so that
|
|
|
|
|
|
+ """Flush all queues.
|
|
|
|
+
|
|
|
|
+ Including the outbound buffer, so that
|
|
all tasks that haven't been started will be discarded.
|
|
all tasks that haven't been started will be discarded.
|
|
|
|
|
|
In Celery this is called whenever the transport connection is lost
|
|
In Celery this is called whenever the transport connection is lost
|
|
@@ -1191,8 +1188,7 @@ class AsynPool(_pool.Pool):
|
|
break
|
|
break
|
|
|
|
|
|
def on_partial_read(self, job, proc):
|
|
def on_partial_read(self, job, proc):
|
|
- """Called when a job was only partially written to a child process
|
|
|
|
- and it exited."""
|
|
|
|
|
|
+ """Called when a job was partially written to exited child."""
|
|
# worker terminated by signal:
|
|
# worker terminated by signal:
|
|
# we cannot reuse the sockets again, because we don't know if
|
|
# we cannot reuse the sockets again, because we don't know if
|
|
# the process wrote/read anything frmo them, and if so we cannot
|
|
# the process wrote/read anything frmo them, and if so we cannot
|
|
@@ -1218,8 +1214,10 @@ class AsynPool(_pool.Pool):
|
|
assert len(self._queues) == before
|
|
assert len(self._queues) == before
|
|
|
|
|
|
def destroy_queues(self, queues, proc):
|
|
def destroy_queues(self, queues, proc):
|
|
- """Destroy queues that can no longer be used, so that they
|
|
|
|
- be replaced by new sockets."""
|
|
|
|
|
|
+ """Destroy queues that can no longer be used.
|
|
|
|
+
|
|
|
|
+ This way they can be replaced by new usable sockets.
|
|
|
|
+ """
|
|
assert not proc._is_alive()
|
|
assert not proc._is_alive()
|
|
self._waiting_to_start.discard(proc)
|
|
self._waiting_to_start.discard(proc)
|
|
removed = 1
|
|
removed = 1
|