فهرست منبع

Comments for celery.concurrency.processes

Ask Solem 11 سال پیش
والد
کامیت
d820cef8ef
1فایلهای تغییر یافته به همراه189 افزوده شده و 69 حذف شده
  1. 189 69
      celery/concurrency/processes.py

+ 189 - 69
celery/concurrency/processes.py

@@ -82,7 +82,13 @@ def gen_not_started(gen):
 
 
 def process_initializer(app, hostname):
-    """Pool child process initializer."""
+    """Pool child process initializer.
+
+    This will initialize a child pool process to ensure the correct
+    app instance is used and things like
+    logging works.
+
+    """
     platforms.signals.reset(*WORKER_SIGRESET)
     platforms.signals.ignore(*WORKER_SIGIGNORE)
     platforms.set_mp_process_title('celeryd', hostname=hostname)
@@ -103,6 +109,7 @@ def process_initializer(app, hostname):
         set_default_app(app)
         app.finalize()
         trace._tasks = app._tasks  # enables fast_trace_task optimization.
+    # rebuild execution handler for all tasks.
     from celery.app.trace import build_tracer
     for name, task in items(app.tasks):
         task.__trace__ = build_tracer(name, task, app.loader, hostname)
@@ -110,6 +117,24 @@ def process_initializer(app, hostname):
 
 
 def _select(readers=None, writers=None, err=None, timeout=0):
+    """Simple wrapper to :class:`~select.select`.
+
+    :param readers: Set of reader fds to test if readable.
+    :param writers: Set of writer fds to test if writable.
+    :param err: Set of fds to test for error condition.
+
+    All fd sets passed must be mutable as this function
+    will remove non-working fds from them, this also means
+    the caller must make sure there are still fds in the sets
+    before calling us again.
+
+    :returns: tuple of ``(readable, writable, again)``, where
+        ``readable`` is a set of fds that have data available for read,
+        ``writable`` is a set of fds that is ready to be written to
+        and ``again`` is a flag that if set means the caller must
+        throw away the result and call us again.
+
+    """
     readers = set() if readers is None else readers
     writers = set() if writers is None else writers
     err = set() if err is None else err
@@ -137,20 +162,28 @@ def _select(readers=None, writers=None, err=None, timeout=0):
 
 
 class Worker(_pool.Worker):
+    """Pool worker process."""
 
     def on_loop_start(self, pid):
+        # our version sends a WORKER_UP message when the process is ready
+        # to accept work, this will tell the parent that the inqueue fd
+        # is writable.
         self.outq.put((WORKER_UP, (pid, )))
 
 
 class ResultHandler(_pool.ResultHandler):
+    """Handles messages from the pool processes."""
 
     def __init__(self, *args, **kwargs):
         self.fileno_to_outq = kwargs.pop('fileno_to_outq')
         self.on_process_alive = kwargs.pop('on_process_alive')
         super(ResultHandler, self).__init__(*args, **kwargs)
+        # add our custom message handler
         self.state_handlers[WORKER_UP] = self.on_process_alive
 
     def _process_result(self):
+        """Coroutine that reads messages from the pool processes
+        and calls the appropriate handler."""
         fileno_to_outq = self.fileno_to_outq
         on_state_change = self.on_state_change
 
@@ -159,6 +192,7 @@ class ResultHandler(_pool.ResultHandler):
             try:
                 proc = fileno_to_outq[fileno]
             except KeyError:
+                # process gone
                 continue
             reader = proc.outq._reader
 
@@ -194,20 +228,27 @@ class ResultHandler(_pool.ResultHandler):
                 self._it = None
 
     def on_stop_not_started(self):
+        """This method is always used to stop when the helper thread is not
+        started."""
         cache = self.cache
         check_timeouts = self.check_timeouts
         fileno_to_outq = self.fileno_to_outq
         on_state_change = self.on_state_change
         join_exited_workers = self.join_exited_workers
 
+        # flush the processes outqueues until they have all terminated.
         outqueues = set(fileno_to_outq)
         while cache and outqueues and self._state != TERMINATE:
             if check_timeouts is not None:
+                # make sure tasks with a time limit will time out.
                 check_timeouts()
             for fd in outqueues:
                 try:
                     proc = fileno_to_outq[fd]
                 except KeyError:
+                    # process already found terminated
+                    # which means its outqueue has already been processed
+                    # by the worker lost handler.
                     outqueues.discard(fd)
                     break
 
@@ -232,34 +273,46 @@ class ResultHandler(_pool.ResultHandler):
 
 
 class AsynPool(_pool.Pool):
+    """Pool version that uses AIO instead of helper threads."""
     ResultHandler = ResultHandler
     Worker = Worker
 
     def __init__(self, processes=None, synack=False, *args, **kwargs):
         processes = self.cpu_count() if processes is None else processes
         self.synack = synack
+        # create queue-pairs for all our processes in advance.
         self._queues = dict((self.create_process_queues(), None)
                             for _ in range(processes))
+
+        # inqueue fileno -> process mapping
         self._fileno_to_inq = {}
+        # outqueue fileno -> process mapping
         self._fileno_to_outq = {}
+        # synqueue fileno -> process mapping
         self._fileno_to_synq = {}
+
+        # denormalized set of all inqueues.
         self._all_inqueues = set()
         super(AsynPool, self).__init__(processes, *args, **kwargs)
 
         for proc in self._pool:
+            # create initial mappings, these will be updated
+            # as processes are recycled, or found lost elsewhere.
             self._fileno_to_inq[proc.inqW_fd] = proc
             self._fileno_to_outq[proc.outqR_fd] = proc
             self._fileno_to_synq[proc.synqW_fd] = proc
 
-    def _finalize_args(self):
-        orig = super(AsynPool, self)._finalize_args()
-        return (self._fileno_to_inq, orig)
-
     def get_process_queues(self):
+        """Get queues for a new process.
+
+        Here we will find an unused slot, as there should always
+        be one available when we start a new process.
+        """
         return next(q for q, owner in items(self._queues)
                     if owner is None)
 
     def on_grow(self, n):
+        """Grow the pool by ``n`` proceses."""
         diff = max(self._processes - len(self._queues), 0)
         if diff:
             self._queues.update(
@@ -267,9 +320,12 @@ class AsynPool(_pool.Pool):
             )
 
     def on_shrink(self, n):
+        """Shrink the pool by ``n`` processes."""
         pass
 
     def create_process_queues(self):
+        """Creates new in, out (and optionally syn) queues,
+        returned as a tuple."""
         inq, outq, synq = _SimpleQueue(), _SimpleQueue(), None
         inq._writer.setblocking(0)
         if self.synack:
@@ -278,24 +334,38 @@ class AsynPool(_pool.Pool):
         return inq, outq, synq
 
     def on_process_alive(self, pid):
+        """Handler called when the WORKER_UP message is received
+        from a child process, which marks the process as ready
+        to receive work."""
         try:
             proc = next(w for w in self._pool if w.pid == pid)
         except StopIteration:
+            # process already exited :(  this will be handled elsewhere.
             return
         self._fileno_to_inq[proc.inqW_fd] = proc
         self._fileno_to_synq[proc.synqW_fd] = proc
         self._all_inqueues.add(proc.inqW_fd)
 
     def on_job_process_down(self, job, pid_gone):
+        """Handler called for each job when the process it was assigned to
+        exits."""
         if job._write_to:
+            # job was partially written
             self.on_partial_read(job, job._write_to)
         elif job._scheduled_for:
+            # job was only scheduled to be written to this process,
+            # but no data was sent so put it back on the outbound_buffer.
             self._put_back(job)
 
     def on_job_process_lost(self, job, pid, exitcode):
+        """Handler called for each *started* job when the process it
+        was assigned to exited by mysterious means (error exitcodes and
+        signals)"""
         self.mark_as_worker_lost(job, exitcode)
 
     def _process_cleanup_queues(self, proc):
+        """Handler called to clean up a processes queues after process
+        exit."""
         try:
             self._queues[self._find_worker_queues(proc)] = None
         except (KeyError, ValueError):
@@ -303,6 +373,7 @@ class AsynPool(_pool.Pool):
 
     @staticmethod
     def _stop_task_handler(task_handler):
+        """Called at shutdown to tell processes that we are shutting down."""
         for proc in task_handler.pool:
             proc.inq._writer.setblocking(1)
             try:
@@ -318,12 +389,15 @@ class AsynPool(_pool.Pool):
         )
 
     def _process_register_queues(self, proc, queues):
+        """Marks new ownership for ``queues`` so that the fileno indices are
+        updated."""
         assert queues in self._queues
         b = len(self._queues)
         self._queues[queues] = proc
         assert b == len(self._queues)
 
     def _find_worker_queues(self, proc):
+        """Find the queues owned by ``proc``."""
         try:
             return next(q for q, owner in items(self._queues)
                         if owner == proc)
@@ -331,10 +405,22 @@ class AsynPool(_pool.Pool):
             raise ValueError(proc)
 
     def _setup_queues(self):
+        # this is only used by the original pool which uses a shared
+        # queue for all processes.
+
+        # these attributes makes no sense for us, but we will still
+        # have to initialize them.
         self._inqueue = self._outqueue = \
             self._quick_put = self._quick_get = self._poll_result = None
 
     def process_flush_queues(self, proc):
+        """Flushes all queues, including the outbound buffer, so that
+        all tasks that have not been started will be discarded.
+
+        In Celery this is called whenever the transport connection is lost
+        (consumer restart).
+
+        """
         resq = proc.outq._reader
         on_state_change = self._result_handler.on_state_change
         while not resq.closed and resq.poll(0) and self._state != TERMINATE:
@@ -351,13 +437,15 @@ class AsynPool(_pool.Pool):
                     debug('got sentinel while flushing process %r', proc)
 
     def on_partial_read(self, job, proc):
+        """Called when a job was only partially written to a child process
+        and it exited."""
         # worker terminated by signal:
         # we cannot reuse the sockets again, because we don't know if
         # the process wrote/read anything frmo them, and if so we cannot
         # restore the message boundaries.
         if proc.exitcode != EX_RECYCLE:
-            # job was not acked, so find another worker to send it to.
             if not job._accepted:
+                # job was not acked, so find another worker to send it to.
                 self._put_back(job)
             writer = getattr(job, '_writer')
             writer = writer and writer() or None
@@ -376,6 +464,8 @@ class AsynPool(_pool.Pool):
             assert len(self._queues) == before
 
     def destroy_queues(self, queues):
+        """Destroyes queues that can no longer be used and that will
+        be replaced by new sockets."""
         removed = 1
         try:
             self._queues.pop(queues)
@@ -405,14 +495,16 @@ class AsynPool(_pool.Pool):
 
     @classmethod
     def _set_result_sentinel(cls, _outqueue, _pool):
+        # unused
         pass
 
     def _help_stuff_finish_args(self):
+        # Pool._help_stuff_finished is a classmethod so we have to use this
+        # trick to modify the arguments passed to it.
         return (self._pool, )
 
     @classmethod
     def _help_stuff_finish(cls, pool):
-        # task_handler may be blocked trying to put items on inqueue
         debug(
             'removing tasks from inqueue until task handler finished',
         )
@@ -465,18 +557,26 @@ class TaskPool(BasePool):
                               initializer=process_initializer,
                               synack=False,
                               **self.options)
+
+        # Create proxy methods
         self.on_apply = P.apply_async
         self.on_soft_timeout = P._timeout_handler.on_soft_timeout
         self.on_hard_timeout = P._timeout_handler.on_hard_timeout
         self.maintain_pool = P.maintain_pool
-        self.terminate_job = self._pool.terminate_job
-        self.grow = self._pool.grow
-        self.shrink = self._pool.shrink
-        self.restart = self._pool.restart
+        self.terminate_job = P.terminate_job
+        self.grow = P.grow
+        self.shrink = P.shrink
+        self.restart = P.restart
         self.maybe_handle_result = P._result_handler.handle_event
-        self.outbound_buffer = deque()
         self.handle_result_event = P.handle_result_event
+
+        # Holds jobs waiting to be written to child processes.
+        self.outbound_buffer = deque()
+
+        # Set of fds being written to (busy)
         self._active_writes = set()
+
+        # Set of active co-routines currently writing jobs.
         self._active_writers = set()
 
     def did_start_ok(self):
@@ -525,35 +625,35 @@ class TaskPool(BasePool):
             'all': ', '.join(per(v, total) for v in vals)
         }
 
-    def _on_soft_timeout(self, job, soft, hard, hub, now=time):
-        if hard:
-            self._tref_for_id[job] = hub.timer.apply_at(
-                now() + (hard - soft),
-                self._on_hard_timeout, (job, ),
-            )
-        try:
-            result = self._pool._cache[job]
-        except KeyError:
-            pass  # job ready
-        else:
-            self.on_soft_timeout(result)
-        finally:
-            if not hard:
-                # remove tref
-                self._discard_tref(job)
+    def on_poll_init(self, w, hub):
+        """Initialize async pool using the eventloop hub."""
+        pool = self._pool
+        pool._active_writers = self._active_writers
 
-    def _on_hard_timeout(self, job):
-        try:
-            result = self._pool._cache[job]
-        except KeyError:
-            pass  # job ready
-        else:
-            self.on_hard_timeout(result)
-        finally:
-            # remove tref
-            self._discard_tref(job)
+        self._create_timelimit_handlers(hub)
+        self._create_process_handlers(hub)
+        self._create_write_handlers(hub)
+
+        # did_start_ok will verify that pool processes were able to start,
+        # but this will only work the first time we start, as
+        # maxtasksperchild will mess up metrics.
+        if not w.consumer.restart_count and not pool.did_start_ok():
+            raise WorkerLostError('Could not start worker processes')
+
+        # Maintain_pool is called whenever a process exits.
+        hub.add(pool.process_sentinels, self.maintain_pool, READ | ERR)
+        # Handle_result_event is called whenever one of the
+        # result queues are readable.
+        hub.add(pool._fileno_to_outq, self.handle_result_event, READ | ERR)
+
+        # Timers include calling maintain_pool at a regular interval
+        # to be certain processes are restarted.
+        for handler, interval in items(self.timers):
+            hub.timer.apply_interval(interval * 1000.0, handler)
 
     def _create_timelimit_handlers(self, hub, now=time):
+        """For async pool this sets up the handlers used
+        to implement time limits."""
         apply_after = hub.timer.apply_after
         trefs = self._tref_for_id = WeakValueDictionary()
 
@@ -583,7 +683,39 @@ class TaskPool(BasePool):
             _discard_tref(R._job)
         self._pool.on_timeout_cancel = on_timeout_cancel
 
+    def _on_soft_timeout(self, job, soft, hard, hub, now=time):
+        # only used by async pool.
+        if hard:
+            self._tref_for_id[job] = hub.timer.apply_at(
+                now() + (hard - soft),
+                self._on_hard_timeout, (job, ),
+            )
+        try:
+            result = self._pool._cache[job]
+        except KeyError:
+            pass  # job ready
+        else:
+            self.on_soft_timeout(result)
+        finally:
+            if not hard:
+                # remove tref
+                self._discard_tref(job)
+
+    def _on_hard_timeout(self, job):
+        # only used by async pool.
+        try:
+            result = self._pool._cache[job]
+        except KeyError:
+            pass  # job ready
+        else:
+            self.on_hard_timeout(result)
+        finally:
+            # remove tref
+            self._discard_tref(job)
+
     def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
+        """For async pool this will create the handlers called
+        when a process is up/down and etc."""
         pool = self._pool
         hub_add, hub_remove = hub.add, hub.remove
         all_inqueues = self._pool._all_inqueues
@@ -595,9 +727,10 @@ class TaskPool(BasePool):
         process_flush_queues = self._pool.process_flush_queues
 
         def on_process_up(proc):
+            """Called when a WORKER_UP message is received from process."""
             # If we got the same fd as a previous process then we will also
             # receive jobs in the old buffer, so we need to reset the
-            # _write_to and _scheduled_for tracking values used to recover
+            # job._write_to and job._scheduled_for attributes used to recover
             # message boundaries when processes exit.
             infd = proc.inqW_fd
             for job in values(pool._cache):
@@ -606,11 +739,15 @@ class TaskPool(BasePool):
                 if job._scheduled_for and job._scheduled_for.inqW_fd == infd:
                     job._scheduled_for = proc
             fileno_to_outq[proc.outqR_fd] = proc
+            # maintain_pool is called whenever a process exits.
             hub_add(proc.sentinel, maintain_pool, READ | ERR)
+            # handle_result_event is called when the processes outqueue is
+            # readable.
             hub_add(proc.outqR_fd, handle_result_event, READ | ERR)
         self._pool.on_process_up = on_process_up
 
         def on_process_down(proc):
+            """Called when a worker process exits."""
             process_flush_queues(proc)
             fileno_to_outq.pop(proc.outqR_fd, None)
             fileno_to_inq.pop(proc.inqW_fd, None)
@@ -623,6 +760,8 @@ class TaskPool(BasePool):
     def _create_write_handlers(self, hub,
                                pack=struct.pack, dumps=_pickle.dumps,
                                protocol=HIGHEST_PROTOCOL):
+        """For async pool this creates the handlers used to write data to
+        child processes."""
         pool = self._pool
         fileno_to_inq = pool._fileno_to_inq
         fileno_to_synq = pool._fileno_to_synq
@@ -637,7 +776,8 @@ class TaskPool(BasePool):
         mark_write_gen_as_active = self._active_writers.add
         write_generator_done = self._active_writers.discard
         get_job = pool._cache.__getitem__
-        pool._put_back = put_message
+        # puts back at the end of the queue
+        pool._put_back = outbound.appendleft
         precalc = {ACK: pool._create_payload(ACK, (0, )),
                    NACK: pool._create_payload(NACK, (0, ))}
 
@@ -652,8 +792,7 @@ class TaskPool(BasePool):
 
         def on_inqueue_close(fd):
             # Makes sure the fd is removed from tracking when
-            # the connection is closed, this is essential as
-            # fds may be reused.
+            # the connection is closed, this is essential as fds may be reused.
             active_writes.discard(fd)
             all_inqueues.discard(fd)
         self._pool.on_inqueue_close = on_inqueue_close
@@ -715,8 +854,7 @@ class TaskPool(BasePool):
         write_stats = self.write_stats = Counter()
 
         def on_not_recovering(proc):
-            # XXX Theoretically a possibility, but maybe terminate the
-            # process in this case to attempt to recover that way.
+            # XXX Theoretically a possibility, but not seen in practice yet.
             raise Exception(
                 'Process writable but cannot write. Contact support!')
 
@@ -733,6 +871,7 @@ class TaskPool(BasePool):
                 send = proc.send_job_offset
 
                 Hw = Bw = 0
+                # write header
                 while Hw < 4:
                     try:
                         Hw += send(header, Hw)
@@ -746,6 +885,8 @@ class TaskPool(BasePool):
                             raise StopIteration()
                         yield
                     errors = 0
+
+                # write body
                 while Bw < body_size:
                     try:
                         Bw += send(body, Bw)
@@ -766,6 +907,7 @@ class TaskPool(BasePool):
                 write_generator_done(job._writer())  # is a weakref
 
         def send_ack(response, pid, job, fd, WRITE=WRITE, ERR=ERR):
+            # Only used when synack is enabled.
             # Schedule writing ack response for when the fd is writeable.
             msg = Ack(job, fd, precalc[response])
             callback = promise(write_generator_done)
@@ -791,6 +933,7 @@ class TaskPool(BasePool):
                 send = proc.send_syn_offset
 
                 Hw = Bw = 0
+                # write header
                 while Hw < 4:
                     try:
                         Hw += send(header, Hw)
@@ -798,6 +941,8 @@ class TaskPool(BasePool):
                         if get_errno(exc) not in UNAVAIL:
                             raise
                         yield
+
+                # write body
                 while Bw < body_size:
                     try:
                         Bw += send(body, Bw)
@@ -812,31 +957,6 @@ class TaskPool(BasePool):
                 # message written, so this fd is now available
                 active_writes.discard(fd)
 
-    def on_poll_init(self, w, hub):
-        pool = self._pool
-        pool._active_writers = self._active_writers
-
-        self._create_timelimit_handlers(hub)
-        self._create_process_handlers(hub)
-        self._create_write_handlers(hub)
-
-        # did_start_ok will verify that pool processes were able to start,
-        # but this will only work the first time we start, as
-        # maxtasksperchild will mess up metrics.
-        if not w.consumer.restart_count and not pool.did_start_ok():
-            raise WorkerLostError('Could not start worker processes')
-
-        # Maintain_pool is called whenever a process exits.
-        hub.add(pool.process_sentinels, self.maintain_pool, READ | ERR)
-        # Handle_result_event is called whenever one of the
-        # result queues are readable.
-        hub.add(pool._fileno_to_outq, self.handle_result_event, READ | ERR)
-
-        # Timers include calling maintain_pool at a regular interval
-        # to be certain processes are restarted.
-        for handler, interval in items(self.timers):
-            hub.timer.apply_interval(interval * 1000.0, handler)
-
     def flush(self):
         if self._pool._state == TERMINATE:
             return