12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211 |
- # -*- coding: utf-8 -*-
- """
- celery.concurrency.asynpool
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~
- .. note::
- This module will be moved soon, so don't use it directly.
- Non-blocking version of :class:`multiprocessing.Pool`.
- This code deals with three major challenges:
- 1) Starting up child processes and keeping them running.
- 2) Sending jobs to the processes and receiving results back.
- 3) Safely shutting down this system.
- """
- from __future__ import absolute_import
- import errno
- import os
- import random
- import select
- import socket
- import struct
- import sys
- import time
- from collections import deque, namedtuple
- from io import BytesIO
- from pickle import HIGHEST_PROTOCOL
- from time import sleep
- from weakref import WeakValueDictionary, ref
- from amqp.utils import promise
- from billiard.pool import RUN, TERMINATE, ACK, NACK, WorkersJoined
- from billiard import pool as _pool
- from billiard.compat import buf_t, setblocking, isblocking
- from billiard.einfo import ExceptionInfo
- from billiard.queues import _SimpleQueue
- from kombu.async import READ, WRITE, ERR
- from kombu.serialization import pickle as _pickle
- from kombu.utils import fxrange
- from kombu.utils.compat import get_errno
- from kombu.utils.eventio import SELECT_BAD_FD
- from celery.five import Counter, items, values
- from celery.utils.log import get_logger
- from celery.utils.text import truncate
- from celery.worker import state as worker_state
- try:
- from _billiard import read as __read__
- from struct import unpack_from as _unpack_from
- memoryview = memoryview
- readcanbuf = True
- if sys.version_info[0] == 2 and sys.version_info < (2, 7, 6):
- def unpack_from(fmt, view, _unpack_from=_unpack_from): # noqa
- return _unpack_from(fmt, view.tobytes()) # <- memoryview
- else:
- # unpack_from supports memoryview in 2.7.6 and 3.3+
- unpack_from = _unpack_from # noqa
- except (ImportError, NameError): # pragma: no cover
- def __read__(fd, buf, size, read=os.read): # noqa
- chunk = read(fd, size)
- n = len(chunk)
- if n != 0:
- buf.write(chunk)
- return n
- readcanbuf = False # noqa
- def unpack_from(fmt, iobuf, unpack=struct.unpack): # noqa
- return unpack(fmt, iobuf.getvalue()) # <-- BytesIO
- logger = get_logger(__name__)
- error, debug = logger.error, logger.debug
- UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR])
- #: Constant sent by child process when started (ready to accept work)
- WORKER_UP = 15
- #: A process must have started before this timeout (in secs.) expires.
- PROC_ALIVE_TIMEOUT = 4.0
- SCHED_STRATEGY_PREFETCH = 1
- SCHED_STRATEGY_FAIR = 4
- SCHED_STRATEGIES = {
- None: SCHED_STRATEGY_PREFETCH,
- 'fair': SCHED_STRATEGY_FAIR,
- }
- RESULT_MAXLEN = 128
- Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
- def gen_not_started(gen):
- # gi_frame is None when generator stopped.
- return gen.gi_frame and gen.gi_frame.f_lasti == -1
- def _get_job_writer(job):
- try:
- writer = job._writer
- except AttributeError:
- pass
- else:
- return writer() # is a weakref
- def _select(readers=None, writers=None, err=None, timeout=0):
- """Simple wrapper to :class:`~select.select`.
- :param readers: Set of reader fds to test if readable.
- :param writers: Set of writer fds to test if writable.
- :param err: Set of fds to test for error condition.
- All fd sets passed must be mutable as this function
- will remove non-working fds from them, this also means
- the caller must make sure there are still fds in the sets
- before calling us again.
- :returns: tuple of ``(readable, writable, again)``, where
- ``readable`` is a set of fds that have data available for read,
- ``writable`` is a set of fds that is ready to be written to
- and ``again`` is a flag that if set means the caller must
- throw away the result and call us again.
- """
- readers = set() if readers is None else readers
- writers = set() if writers is None else writers
- err = set() if err is None else err
- try:
- r, w, e = select.select(readers, writers, err, timeout)
- if e:
- r = list(set(r) | set(e))
- return r, w, 0
- except (select.error, socket.error) as exc:
- if get_errno(exc) == errno.EINTR:
- return [], [], 1
- elif get_errno(exc) in SELECT_BAD_FD:
- for fd in readers | writers | err:
- try:
- select.select([fd], [], [], 0)
- except (select.error, socket.error) as exc:
- if get_errno(exc) not in SELECT_BAD_FD:
- raise
- readers.discard(fd)
- writers.discard(fd)
- err.discard(fd)
- return [], [], 1
- else:
- raise
- class Worker(_pool.Worker):
- """Pool worker process."""
- dead = False
- def on_loop_start(self, pid):
- # our version sends a WORKER_UP message when the process is ready
- # to accept work, this will tell the parent that the inqueue fd
- # is writable.
- self.outq.put((WORKER_UP, (pid, )))
- def prepare_result(self, result, RESULT_MAXLEN=RESULT_MAXLEN):
- if not isinstance(result, ExceptionInfo):
- return truncate(repr(result), RESULT_MAXLEN)
- return result
- class ResultHandler(_pool.ResultHandler):
- """Handles messages from the pool processes."""
- def __init__(self, *args, **kwargs):
- self.fileno_to_outq = kwargs.pop('fileno_to_outq')
- self.on_process_alive = kwargs.pop('on_process_alive')
- super(ResultHandler, self).__init__(*args, **kwargs)
- # add our custom message handler
- self.state_handlers[WORKER_UP] = self.on_process_alive
- def _recv_message(self, add_reader, fd, callback,
- __read__=__read__, readcanbuf=readcanbuf,
- BytesIO=BytesIO, unpack_from=unpack_from,
- load=_pickle.load):
- Hr = Br = 0
- if readcanbuf:
- buf = bytearray(4)
- bufv = memoryview(buf)
- else:
- buf = bufv = BytesIO()
- # header
- assert not isblocking(fd)
- while Hr < 4:
- try:
- n = __read__(
- fd, bufv[Hr:] if readcanbuf else bufv, 4 - Hr,
- )
- except OSError as exc:
- if get_errno(exc) not in UNAVAIL:
- raise
- yield
- else:
- if n == 0:
- raise (OSError('End of file during message') if Hr
- else EOFError())
- Hr += n
- body_size, = unpack_from('>i', bufv)
- if readcanbuf:
- buf = bytearray(body_size)
- bufv = memoryview(buf)
- else:
- buf = bufv = BytesIO()
- while Br < body_size:
- try:
- n = __read__(
- fd, bufv[Br:] if readcanbuf else bufv, body_size - Br,
- )
- except OSError as exc:
- if get_errno(exc) not in UNAVAIL:
- raise
- yield
- else:
- if n == 0:
- raise (OSError('End of file during message') if Br
- else EOFError())
- Br += n
- add_reader(fd, self.handle_event, fd)
- if readcanbuf:
- message = load(BytesIO(bufv))
- else:
- bufv.seek(0)
- message = load(bufv)
- if message:
- callback(message)
- def _make_process_result(self, hub):
- """Coroutine that reads messages from the pool processes
- and calls the appropriate handler."""
- fileno_to_outq = self.fileno_to_outq
- on_state_change = self.on_state_change
- add_reader = hub.add_reader
- hub_remove = hub.remove
- recv_message = self._recv_message
- def on_result_readable(fileno):
- try:
- fileno_to_outq[fileno]
- except KeyError: # process gone
- return hub_remove(fileno)
- it = recv_message(add_reader, fileno, on_state_change)
- try:
- next(it)
- except StopIteration:
- pass
- except (IOError, OSError, EOFError):
- hub_remove(fileno)
- else:
- add_reader(fileno, it)
- return on_result_readable
- def register_with_event_loop(self, hub):
- self.handle_event = self._make_process_result(hub)
- def handle_event(self, fileno):
- raise RuntimeError('Not registered with event loop')
- def on_stop_not_started(self):
- """This method is always used to stop when the helper thread is not
- started."""
- cache = self.cache
- check_timeouts = self.check_timeouts
- fileno_to_outq = self.fileno_to_outq
- on_state_change = self.on_state_change
- join_exited_workers = self.join_exited_workers
- # flush the processes outqueues until they have all terminated.
- outqueues = set(fileno_to_outq)
- while cache and outqueues and self._state != TERMINATE:
- if check_timeouts is not None:
- # make sure tasks with a time limit will time out.
- check_timeouts()
- # cannot iterate and remove at the same time
- pending_remove_fd = set()
- for fd in outqueues:
- self._flush_outqueue(
- fd, pending_remove_fd.discard, fileno_to_outq,
- on_state_change,
- )
- try:
- join_exited_workers(shutdown=True)
- except WorkersJoined:
- return debug('result handler: all workers terminated')
- outqueues.difference_update(pending_remove_fd)
- def _flush_outqueue(self, fd, remove, process_index, on_state_change):
- try:
- proc = process_index[fd]
- except KeyError:
- # process already found terminated
- # which means its outqueue has already been processed
- # by the worker lost handler.
- return remove(fd)
- reader = proc.outq._reader
- try:
- setblocking(reader, 1)
- except (OSError, IOError):
- return remove(fd)
- try:
- if reader.poll(0):
- task = reader.recv()
- else:
- task = None
- sleep(0.5)
- except (IOError, EOFError):
- return remove(fd)
- else:
- if task:
- on_state_change(task)
- finally:
- try:
- setblocking(reader, 0)
- except (OSError, IOError):
- return remove(fd)
- class AsynPool(_pool.Pool):
- """Pool version that uses AIO instead of helper threads."""
- ResultHandler = ResultHandler
- Worker = Worker
- def __init__(self, processes=None, synack=False,
- sched_strategy=None, *args, **kwargs):
- self.sched_strategy = SCHED_STRATEGIES.get(sched_strategy,
- sched_strategy)
- processes = self.cpu_count() if processes is None else processes
- self.synack = synack
- # create queue-pairs for all our processes in advance.
- self._queues = dict((self.create_process_queues(), None)
- for _ in range(processes))
- # inqueue fileno -> process mapping
- self._fileno_to_inq = {}
- # outqueue fileno -> process mapping
- self._fileno_to_outq = {}
- # synqueue fileno -> process mapping
- self._fileno_to_synq = {}
- # We keep track of processes that have not yet
- # sent a WORKER_UP message. If a process fails to send
- # this message within proc_up_timeout we terminate it
- # and hope the next process will recover.
- self._proc_alive_timeout = PROC_ALIVE_TIMEOUT
- self._waiting_to_start = set()
- # denormalized set of all inqueues.
- self._all_inqueues = set()
- # Set of fds being written to (busy)
- self._active_writes = set()
- # Set of active co-routines currently writing jobs.
- self._active_writers = set()
- # Set of fds that are busy (executing task)
- self._busy_workers = set()
- self._mark_worker_as_available = self._busy_workers.discard
- # Holds jobs waiting to be written to child processes.
- self.outbound_buffer = deque()
- self.write_stats = Counter()
- super(AsynPool, self).__init__(processes, *args, **kwargs)
- for proc in self._pool:
- # create initial mappings, these will be updated
- # as processes are recycled, or found lost elsewhere.
- self._fileno_to_outq[proc.outqR_fd] = proc
- self._fileno_to_synq[proc.synqW_fd] = proc
- self.on_soft_timeout = self._timeout_handler.on_soft_timeout
- self.on_hard_timeout = self._timeout_handler.on_hard_timeout
- def _event_process_exit(self, hub, fd):
- # This method is called whenever the process sentinel is readable.
- hub.remove(fd)
- self.maintain_pool()
- def register_with_event_loop(self, hub):
- """Registers the async pool with the current event loop."""
- self._result_handler.register_with_event_loop(hub)
- self.handle_result_event = self._result_handler.handle_event
- self._create_timelimit_handlers(hub)
- self._create_process_handlers(hub)
- self._create_write_handlers(hub)
- # Add handler for when a process exits (calls maintain_pool)
- [hub.add_reader(fd, self._event_process_exit, hub, fd)
- for fd in self.process_sentinels]
- # Handle_result_event is called whenever one of the
- # result queues are readable.
- [hub.add_reader(fd, self.handle_result_event, fd)
- for fd in self._fileno_to_outq]
- # Timers include calling maintain_pool at a regular interval
- # to be certain processes are restarted.
- for handler, interval in items(self.timers):
- hub.call_repeatedly(interval, handler)
- hub.on_tick.add(self.on_poll_start)
- def _create_timelimit_handlers(self, hub, now=time.time):
- """For async pool this sets up the handlers used
- to implement time limits."""
- call_later = hub.call_later
- trefs = self._tref_for_id = WeakValueDictionary()
- def on_timeout_set(R, soft, hard):
- if soft:
- trefs[R._job] = call_later(
- soft, self._on_soft_timeout, R._job, soft, hard, hub,
- )
- elif hard:
- trefs[R._job] = call_later(
- hard, self._on_hard_timeout, R._job,
- )
- self.on_timeout_set = on_timeout_set
- def _discard_tref(job):
- try:
- tref = trefs.pop(job)
- tref.cancel()
- del(tref)
- except (KeyError, AttributeError):
- pass # out of scope
- self._discard_tref = _discard_tref
- def on_timeout_cancel(R):
- _discard_tref(R._job)
- self.on_timeout_cancel = on_timeout_cancel
- def _on_soft_timeout(self, job, soft, hard, hub, now=time.time):
- # only used by async pool.
- if hard:
- self._tref_for_id[job] = hub.call_at(
- now() + (hard - soft), self._on_hard_timeout, job,
- )
- try:
- result = self._cache[job]
- except KeyError:
- pass # job ready
- else:
- self.on_soft_timeout(result)
- finally:
- if not hard:
- # remove tref
- self._discard_tref(job)
- def _on_hard_timeout(self, job):
- # only used by async pool.
- try:
- result = self._cache[job]
- except KeyError:
- pass # job ready
- else:
- self.on_hard_timeout(result)
- finally:
- # remove tref
- self._discard_tref(job)
- def on_job_ready(self, job, i, obj, inqW_fd):
- self._mark_worker_as_available(inqW_fd)
- def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
- """For async pool this will create the handlers called
- when a process is up/down and etc."""
- add_reader, hub_remove = hub.add_reader, hub.remove
- cache = self._cache
- all_inqueues = self._all_inqueues
- fileno_to_inq = self._fileno_to_inq
- fileno_to_outq = self._fileno_to_outq
- fileno_to_synq = self._fileno_to_synq
- busy_workers = self._busy_workers
- event_process_exit = self._event_process_exit
- handle_result_event = self.handle_result_event
- process_flush_queues = self.process_flush_queues
- waiting_to_start = self._waiting_to_start
- def verify_process_alive(proc):
- if proc._is_alive() and proc in waiting_to_start:
- assert proc.outqR_fd in fileno_to_outq
- assert fileno_to_outq[proc.outqR_fd] is proc
- assert proc.outqR_fd in hub.readers, "%s.outqR_fd=%s not in hub.readers !" % (proc, proc.outqR_fd)
- error('Timed out waiting for UP message from %r', proc)
- os.kill(proc.pid, 9)
- def on_process_up(proc):
- """Called when a process has started."""
- # If we got the same fd as a previous process then we will also
- # receive jobs in the old buffer, so we need to reset the
- # job._write_to and job._scheduled_for attributes used to recover
- # message boundaries when processes exit.
- infd = proc.inqW_fd
- for job in values(cache):
- if job._write_to and job._write_to.inqW_fd == infd:
- job._write_to = proc
- if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
- job._scheduled_for = proc
- fileno_to_outq[proc.outqR_fd] = proc
- # maintain_pool is called whenever a process exits.
- add_reader(
- proc.sentinel, event_process_exit, hub, proc.sentinel,
- )
- assert not isblocking(proc.outq._reader)
- # handle_result_event is called when the processes outqueue is
- # readable.
- add_reader(proc.outqR_fd, handle_result_event, proc.outqR_fd)
- waiting_to_start.add(proc)
- hub.call_later(
- self._proc_alive_timeout, verify_process_alive, proc,
- )
- self.on_process_up = on_process_up
- def _remove_from_index(obj, proc, index, callback=None):
- # this remove the file descriptors for a process from
- # the indices. we have to make sure we don't overwrite
- # another processes fds, as the fds may be reused.
- try:
- fd = obj.fileno()
- except (IOError, OSError):
- return
- try:
- if index[fd] is proc:
- # fd has not been reused so we can remove it from index.
- index.pop(fd, None)
- except KeyError:
- pass
- else:
- hub_remove(fd)
- if callback is not None:
- callback(fd)
- return fd
- def on_process_down(proc):
- """Called when a worker process exits."""
- if proc.dead:
- return
- process_flush_queues(proc)
- _remove_from_index(proc.outq._reader, proc, fileno_to_outq)
- if proc.synq:
- _remove_from_index(proc.synq._writer, proc, fileno_to_synq)
- inq = _remove_from_index(proc.inq._writer, proc, fileno_to_inq,
- callback=all_inqueues.discard)
- if inq:
- busy_workers.discard(inq)
- hub_remove(proc.sentinel)
- waiting_to_start.discard(proc)
- hub_remove(proc.inqW_fd)
- hub_remove(proc.outqR_fd)
- if proc.synqR_fd:
- hub_remove(proc.synqR_fd)
- if proc.synqW_fd:
- hub_remove(proc.synqW_fd)
- self.on_process_down = on_process_down
- def _create_write_handlers(self, hub,
- pack=struct.pack, dumps=_pickle.dumps,
- protocol=HIGHEST_PROTOCOL):
- """For async pool this creates the handlers used to write data to
- child processes."""
- fileno_to_inq = self._fileno_to_inq
- fileno_to_synq = self._fileno_to_synq
- outbound = self.outbound_buffer
- pop_message = outbound.popleft
- put_message = outbound.append
- all_inqueues = self._all_inqueues
- active_writes = self._active_writes
- active_writers = self._active_writers
- busy_workers = self._busy_workers
- diff = all_inqueues.difference
- add_reader, add_writer = hub.add_reader, hub.add_writer
- hub_add, hub_remove = hub.add, hub.remove
- mark_write_fd_as_active = active_writes.add
- mark_write_gen_as_active = active_writers.add
- mark_worker_as_busy = busy_workers.add
- write_generator_done = active_writers.discard
- get_job = self._cache.__getitem__
- write_stats = self.write_stats
- is_fair_strategy = self.sched_strategy == SCHED_STRATEGY_FAIR
- revoked_tasks = worker_state.revoked
- getpid = os.getpid
- precalc = {ACK: self._create_payload(ACK, (0, )),
- NACK: self._create_payload(NACK, (0, ))}
- def _put_back(job, _time=time.time):
- # puts back at the end of the queue
- if job._terminated is not None or \
- job.correlation_id in revoked_tasks:
- if not job._accepted:
- job._ack(None, _time(), getpid(), None)
- job._set_terminated(job._terminated)
- else:
- # XXX linear lookup, should find a better way,
- # but this happens rarely and is here to protect against races.
- if job not in outbound:
- outbound.appendleft(job)
- self._put_back = _put_back
- # called for every event loop iteration, and if there
- # are messages pending this will schedule writing one message
- # by registering the 'schedule_writes' function for all currently
- # inactive inqueues (not already being written to)
- # consolidate means the event loop will merge them
- # and call the callback once with the list writable fds as
- # argument. Using this means we minimize the risk of having
- # the same fd receive every task if the pipe read buffer is not
- # full.
- if is_fair_strategy:
- def on_poll_start():
- if outbound and len(busy_workers) < len(all_inqueues):
- #print('ALL: %r ACTIVE: %r' % (len(all_inqueues),
- # len(active_writes)))
- inactive = diff(active_writes)
- [hub_add(fd, None, WRITE | ERR, consolidate=True)
- for fd in inactive]
- else:
- [hub_remove(fd) for fd in diff(active_writes)]
- else:
- def on_poll_start(): # noqa
- if outbound:
- [hub_add(fd, None, WRITE | ERR, consolidate=True)
- for fd in diff(active_writes)]
- else:
- [hub_remove(fd) for fd in diff(active_writes)]
- self.on_poll_start = on_poll_start
- def on_inqueue_close(fd, proc):
- # Makes sure the fd is removed from tracking when
- # the connection is closed, this is essential as fds may be reused.
- busy_workers.discard(fd)
- try:
- if fileno_to_inq[fd] is proc:
- fileno_to_inq.pop(fd, None)
- active_writes.discard(fd)
- all_inqueues.discard(fd)
- hub_remove(fd)
- except KeyError:
- pass
- self.on_inqueue_close = on_inqueue_close
- def schedule_writes(ready_fds, shuffle=random.shuffle):
- # Schedule write operation to ready file descriptor.
- # The file descriptor is writeable, but that does not
- # mean the process is currently reading from the socket.
- # The socket is buffered so writeable simply means that
- # the buffer can accept at least 1 byte of data.
- shuffle(ready_fds)
- for ready_fd in ready_fds:
- if ready_fd in active_writes:
- # already writing to this fd
- continue
- if is_fair_strategy and ready_fd in busy_workers:
- # worker is already busy with another task
- continue
- if ready_fd not in all_inqueues:
- hub_remove(ready_fd)
- continue
- try:
- job = pop_message()
- except IndexError:
- # no more messages, remove all inactive fds from the hub.
- # this is important since the fds are always writeable
- # as long as there's 1 byte left in the buffer, and so
- # this may create a spinloop where the event loop
- # always wakes up.
- for inqfd in diff(active_writes):
- hub_remove(inqfd)
- break
- else:
- if not job._accepted: # job not accepted by another worker
- try:
- # keep track of what process the write operation
- # was scheduled for.
- proc = job._scheduled_for = fileno_to_inq[ready_fd]
- except KeyError:
- # write was scheduled for this fd but the process
- # has since exited and the message must be sent to
- # another process.
- put_message(job)
- continue
- cor = _write_job(proc, ready_fd, job)
- job._writer = ref(cor)
- mark_write_gen_as_active(cor)
- mark_write_fd_as_active(ready_fd)
- mark_worker_as_busy(ready_fd)
- # Try to write immediately, in case there's an error.
- try:
- next(cor)
- except StopIteration:
- pass
- except OSError as exc:
- if get_errno(exc) != errno.EBADF:
- raise
- else:
- add_writer(ready_fd, cor)
- hub.consolidate_callback = schedule_writes
- def send_job(tup):
- # Schedule writing job request for when one of the process
- # inqueues are writable.
- body = dumps(tup, protocol=protocol)
- body_size = len(body)
- header = pack('>I', body_size)
- # index 1,0 is the job ID.
- job = get_job(tup[1][0])
- job._payload = buf_t(header), buf_t(body), body_size
- put_message(job)
- self._quick_put = send_job
- def on_not_recovering(proc, fd, job):
- error('Process inqueue damaged: %r %r' % (proc, proc.exitcode))
- if proc._is_alive():
- proc.terminate()
- hub.remove(fd)
- self._put_back(job)
- def _write_job(proc, fd, job):
- # writes job to the worker process.
- # Operation must complete if more than one byte of data
- # was written. If the broker connection is lost
- # and no data was written the operation shall be cancelled.
- header, body, body_size = job._payload
- errors = 0
- try:
- # job result keeps track of what process the job is sent to.
- job._write_to = proc
- send = proc.send_job_offset
- Hw = Bw = 0
- # write header
- while Hw < 4:
- try:
- Hw += send(header, Hw)
- except Exception as exc:
- if get_errno(exc) not in UNAVAIL:
- raise
- # suspend until more data
- errors += 1
- if errors > 100:
- on_not_recovering(proc, fd, job)
- raise StopIteration()
- yield
- else:
- errors = 0
- # write body
- while Bw < body_size:
- try:
- Bw += send(body, Bw)
- except Exception as exc:
- if get_errno(exc) not in UNAVAIL:
- raise
- # suspend until more data
- errors += 1
- if errors > 100:
- on_not_recovering(proc, fd, job)
- raise StopIteration()
- yield
- else:
- errors = 0
- finally:
- hub_remove(fd)
- write_stats[proc.index] += 1
- # message written, so this fd is now available
- active_writes.discard(fd)
- write_generator_done(job._writer()) # is a weakref
- def send_ack(response, pid, job, fd, WRITE=WRITE, ERR=ERR):
- # Only used when synack is enabled.
- # Schedule writing ack response for when the fd is writeable.
- msg = Ack(job, fd, precalc[response])
- callback = promise(write_generator_done)
- cor = _write_ack(fd, msg, callback=callback)
- mark_write_gen_as_active(cor)
- mark_write_fd_as_active(fd)
- callback.args = (cor, )
- add_writer(fd, cor)
- self.send_ack = send_ack
- def _write_ack(fd, ack, callback=None):
- # writes ack back to the worker if synack enabled.
- # this operation *MUST* complete, otherwise
- # the worker process will hang waiting for the ack.
- header, body, body_size = ack[2]
- try:
- try:
- proc = fileno_to_synq[fd]
- except KeyError:
- # process died, we can safely discard the ack at this
- # point.
- raise StopIteration()
- send = proc.send_syn_offset
- Hw = Bw = 0
- # write header
- while Hw < 4:
- try:
- Hw += send(header, Hw)
- except Exception as exc:
- if get_errno(exc) not in UNAVAIL:
- raise
- yield
- # write body
- while Bw < body_size:
- try:
- Bw += send(body, Bw)
- except Exception as exc:
- if get_errno(exc) not in UNAVAIL:
- raise
- # suspend until more data
- yield
- finally:
- if callback:
- callback()
- # message written, so this fd is now available
- active_writes.discard(fd)
- def flush(self):
- if self._state == TERMINATE:
- return
- # cancel all tasks that have not been accepted so that NACK is sent.
- for job in values(self._cache):
- if not job._accepted:
- job._cancel()
- # clear the outgoing buffer as the tasks will be redelivered by
- # the broker anyway.
- if self.outbound_buffer:
- self.outbound_buffer.clear()
- self.maintain_pool()
- try:
- # ...but we must continue writing the payloads we already started
- # to keep message boundaries.
- # The messages may be NACK'ed later if synack is enabled.
- if self._state == RUN:
- # flush outgoing buffers
- intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)
- owned_by = {}
- for job in values(self._cache):
- writer = _get_job_writer(job)
- if writer is not None:
- owned_by[writer] = job
- while self._active_writers:
- writers = list(self._active_writers)
- for gen in writers:
- if (gen.__name__ == '_write_job' and
- gen_not_started(gen)):
- # has not started writing the job so can
- # discard the task, but we must also remove
- # it from the Pool._cache.
- try:
- job = owned_by[gen]
- except KeyError:
- pass
- else:
- # removes from Pool._cache
- job.discard()
- self._active_writers.discard(gen)
- else:
- try:
- job = owned_by[gen]
- except KeyError:
- pass
- else:
- job_proc = job._write_to
- if job_proc._is_alive():
- self._flush_writer(job_proc, gen)
- # workers may have exited in the meantime.
- self.maintain_pool()
- sleep(next(intervals)) # don't busyloop
- finally:
- self.outbound_buffer.clear()
- self._active_writers.clear()
- self._active_writes.clear()
- self._busy_workers.clear()
- def _flush_writer(self, proc, writer):
- fds = set([proc.inq._writer])
- try:
- while fds:
- if not proc._is_alive():
- break # process exited
- readable, writable, again = _select(
- writers=fds, err=fds, timeout=0.5,
- )
- if not again and (writable or readable):
- try:
- next(writer)
- except (StopIteration, OSError, IOError, EOFError):
- break
- finally:
- self._active_writers.discard(writer)
- def get_process_queues(self):
- """Get queues for a new process.
- Here we will find an unused slot, as there should always
- be one available when we start a new process.
- """
- return next(q for q, owner in items(self._queues)
- if owner is None)
- def on_grow(self, n):
- """Grow the pool by ``n`` proceses."""
- diff = max(self._processes - len(self._queues), 0)
- if diff:
- self._queues.update(
- dict((self.create_process_queues(), None) for _ in range(diff))
- )
- def on_shrink(self, n):
- """Shrink the pool by ``n`` processes."""
- pass
- def create_process_queues(self):
- """Creates new in, out (and optionally syn) queues,
- returned as a tuple."""
- # NOTE: Pipes must be set O_NONBLOCK at creation time (the original
- # fd), otherwise it will not be possible to change the flags until
- # there is an actual reader/writer on the other side.
- inq = _SimpleQueue(wnonblock=True)
- outq = _SimpleQueue(rnonblock=True)
- synq = None
- assert isblocking(inq._reader)
- assert not isblocking(inq._writer)
- assert not isblocking(outq._reader)
- assert isblocking(outq._writer)
- if self.synack:
- synq = _SimpleQueue(wnonblock=True)
- assert isblocking(synq._reader)
- assert not isblocking(synq._writer)
- return inq, outq, synq
- def on_process_alive(self, pid):
- """Handler called when the WORKER_UP message is received
- from a child process, which marks the process as ready
- to receive work."""
- try:
- proc = next(w for w in self._pool if w.pid == pid)
- except StopIteration:
- logger.warning("process with pid=%s already exited :( - handling this elsewhere ...", pid)
- return
- assert proc.inqW_fd not in self._fileno_to_inq
- assert proc.inqW_fd not in self._all_inqueues
- self._waiting_to_start.discard(proc)
- self._fileno_to_inq[proc.inqW_fd] = proc
- self._fileno_to_synq[proc.synqW_fd] = proc
- self._all_inqueues.add(proc.inqW_fd)
- def on_job_process_down(self, job, pid_gone):
- """Handler called for each job when the process it was assigned to
- exits."""
- if job._write_to and not job._write_to._is_alive():
- # job was partially written
- self.on_partial_read(job, job._write_to)
- elif job._scheduled_for and not job._scheduled_for._is_alive():
- # job was only scheduled to be written to this process,
- # but no data was sent so put it back on the outbound_buffer.
- self._put_back(job)
- def on_job_process_lost(self, job, pid, exitcode):
- """Handler called for each *started* job when the process it
- was assigned to exited by mysterious means (error exitcodes and
- signals)"""
- self.mark_as_worker_lost(job, exitcode)
- def human_write_stats(self):
- if self.write_stats is None:
- return 'N/A'
- vals = list(values(self.write_stats))
- total = sum(vals)
- def per(v, total):
- return '{0:.2f}%'.format((float(v) / total) * 100.0 if v else 0)
- return {
- 'total': total,
- 'avg': per(total / len(self.write_stats) if total else 0, total),
- 'all': ', '.join(per(v, total) for v in vals),
- 'raw': ', '.join(map(str, vals)),
- 'inqueues': {
- 'total': len(self._all_inqueues),
- 'active': len(self._active_writes),
- }
- }
- def _process_cleanup_queues(self, proc):
- """Handler called to clean up a processes queues after process
- exit."""
- if not proc.dead:
- try:
- self._queues[self._find_worker_queues(proc)] = None
- except (KeyError, ValueError):
- pass
- @staticmethod
- def _stop_task_handler(task_handler):
- """Called at shutdown to tell processes that we are shutting down."""
- for proc in task_handler.pool:
- try:
- setblocking(proc.inq._writer, 1)
- except (OSError, IOError):
- pass
- else:
- try:
- proc.inq.put(None)
- except OSError as exc:
- if get_errno(exc) != errno.EBADF:
- raise
- def create_result_handler(self):
- return super(AsynPool, self).create_result_handler(
- fileno_to_outq=self._fileno_to_outq,
- on_process_alive=self.on_process_alive,
- )
- def _process_register_queues(self, proc, queues):
- """Marks new ownership for ``queues`` so that the fileno indices are
- updated."""
- assert queues in self._queues
- b = len(self._queues)
- self._queues[queues] = proc
- assert b == len(self._queues)
- def _find_worker_queues(self, proc):
- """Find the queues owned by ``proc``."""
- try:
- return next(q for q, owner in items(self._queues)
- if owner == proc)
- except StopIteration:
- raise ValueError(proc)
- def _setup_queues(self):
- # this is only used by the original pool which uses a shared
- # queue for all processes.
- # these attributes makes no sense for us, but we will still
- # have to initialize them.
- self._inqueue = self._outqueue = \
- self._quick_put = self._quick_get = self._poll_result = None
- def process_flush_queues(self, proc):
- """Flushes all queues, including the outbound buffer, so that
- all tasks that have not been started will be discarded.
- In Celery this is called whenever the transport connection is lost
- (consumer restart).
- """
- resq = proc.outq._reader
- on_state_change = self._result_handler.on_state_change
- fds = set([resq])
- while fds and not resq.closed and self._state != TERMINATE:
- readable, _, again = _select(fds, None, fds, timeout=0.01)
- if readable:
- try:
- task = resq.recv()
- except (OSError, IOError, EOFError) as exc:
- if get_errno(exc) == errno.EINTR:
- continue
- elif get_errno(exc) == errno.EAGAIN:
- break
- else:
- debug('got %r while flushing process %r',
- exc, proc, exc_info=1)
- if get_errno(exc) not in UNAVAIL:
- debug('got %r while flushing process %r',
- exc, proc, exc_info=1)
- break
- else:
- if task is None:
- debug('got sentinel while flushing process %r', proc)
- break
- else:
- on_state_change(task)
- else:
- break
- def on_partial_read(self, job, proc):
- """Called when a job was only partially written to a child process
- and it exited."""
- # worker terminated by signal:
- # we cannot reuse the sockets again, because we don't know if
- # the process wrote/read anything frmo them, and if so we cannot
- # restore the message boundaries.
- if not job._accepted:
- # job was not acked, so find another worker to send it to.
- self._put_back(job)
- writer = _get_job_writer(job)
- if writer:
- self._active_writers.discard(writer)
- del(writer)
- if not proc.dead:
- proc.dead = True
- # Replace queues to avoid reuse
- before = len(self._queues)
- try:
- queues = self._find_worker_queues(proc)
- if self.destroy_queues(queues, proc):
- self._queues[self.create_process_queues()] = None
- except ValueError:
- pass
- # Not in queue map, make sure sockets are closed.
- #self.destroy_queues((proc.inq, proc.outq, proc.synq))
- assert len(self._queues) == before
- def destroy_queues(self, queues, proc):
- """Destroy queues that can no longer be used, so that they
- be replaced by new sockets."""
- assert not proc._is_alive()
- self._waiting_to_start.discard(proc)
- removed = 1
- try:
- self._queues.pop(queues)
- except KeyError:
- removed = 0
- try:
- self.on_inqueue_close(queues[0]._writer.fileno(), proc)
- except IOError:
- pass
- for queue in queues:
- if queue:
- for sock in (queue._reader, queue._writer):
- if not sock.closed:
- try:
- sock.close()
- except (IOError, OSError):
- pass
- return removed
- def _create_payload(self, type_, args,
- dumps=_pickle.dumps, pack=struct.pack,
- protocol=HIGHEST_PROTOCOL):
- body = dumps((type_, args), protocol=protocol)
- size = len(body)
- header = pack('>I', size)
- return header, body, size
- @classmethod
- def _set_result_sentinel(cls, _outqueue, _pool):
- # unused
- pass
- def _help_stuff_finish_args(self):
- # Pool._help_stuff_finished is a classmethod so we have to use this
- # trick to modify the arguments passed to it.
- return (self._pool, )
- @classmethod
- def _help_stuff_finish(cls, pool):
- debug(
- 'removing tasks from inqueue until task handler finished',
- )
- fileno_to_proc = {}
- inqR = set()
- for w in pool:
- try:
- fd = w.inq._reader.fileno()
- inqR.add(fd)
- fileno_to_proc[fd] = w
- except IOError:
- pass
- while inqR:
- readable, _, again = _select(inqR, timeout=0.5)
- if again:
- continue
- if not readable:
- break
- for fd in readable:
- fileno_to_proc[fd].inq._reader.recv()
- sleep(0)
- @property
- def timers(self):
- return {self.maintain_pool: 5.0}
|