Ver Fonte

Stresstests passing except for soft_timelimits+maxtasksperchild

Ask Solem há 12 anos atrás
pai
commit
457ee283c4
1 ficheiros alterados com 220 adições e 135 exclusões
  1. 220 135
      celery/concurrency/processes.py

+ 220 - 135
celery/concurrency/processes.py

@@ -17,7 +17,7 @@ import select
 import socket
 import struct
 
-from collections import deque
+from collections import Counter, deque, namedtuple
 from pickle import HIGHEST_PROTOCOL
 from time import sleep, time
 
@@ -65,6 +65,8 @@ WORKER_UP = 15
 logger = get_logger(__name__)
 warning, debug = logger.warning, logger.debug
 
+Ack = namedtuple('Ack', ('id', 'fd', 'payload'))
+
 
 def process_initializer(app, hostname):
     """Pool child process initializer."""
@@ -383,6 +385,14 @@ class AsynPool(_pool.Pool):
                             pass
         return removed
 
+    def _create_payload(self, type_, args,
+                        dumps=_pickle.dumps, pack=struct.pack,
+                        protocol=HIGHEST_PROTOCOL):
+        body = dumps((type_, args), protocol=protocol)
+        size = len(body)
+        header = pack('>I', size)
+        return header, body, size
+
     @classmethod
     def _set_result_sentinel(cls, _outqueue, _pool):
         pass
@@ -422,6 +432,7 @@ class TaskPool(BasePool):
     BlockingPool = _pool.Pool
 
     uses_semaphore = True
+    write_stats = None
 
     def on_start(self):
         """Run the task pool.
@@ -479,52 +490,36 @@ class TaskPool(BasePool):
             self._pool.close()
 
     def _get_info(self):
-        return {'max-concurrency': self.limit,
-                'processes': [p.pid for p in self._pool._pool],
-                'max-tasks-per-child': self._pool._maxtasksperchild,
-                'put-guarded-by-semaphore': self.putlocks,
-                'timeouts': (self._pool.soft_timeout, self._pool.timeout)}
-
-    def on_poll_init(self, w, hub,
-                     now=time, protocol=HIGHEST_PROTOCOL, pack=struct.pack,
-                     dumps=_pickle.dumps):
-        pool = self._pool
+        return {
+            'max-concurrency': self.limit,
+            'processes': [p.pid for p in self._pool._pool],
+            'max-tasks-per-child': self._pool._maxtasksperchild or 'N/A',
+            'put-guarded-by-semaphore': self.putlocks,
+            'timeouts': (self._pool.soft_timeout or 0,
+                         self._pool.timeout or 0),
+            'writes': self.human_write_stats(),
+        }
+
+    def human_write_stats(self):
+        if self.write_stats is None:
+            return 'N/A'
+        vals = list(values(self.write_stats))
+        total = sum(vals)
+
+        def per(v, total):
+            return '{0:.2f}%'.format((float(v) / total) * 100.0 if v else 0)
+
+        return {
+            'total': total,
+            'avg': per(total / len(self.write_stats), total),
+            'all': ', '.join(per(v, total) for v in vals)
+        }
+
+    def _create_timelimit_handlers(self, hub, now=time):
         apply_after = hub.timer.apply_after
         apply_at = hub.timer.apply_at
-        maintain_pool = self.maintain_pool
         on_soft_timeout = self.on_soft_timeout
         on_hard_timeout = self.on_hard_timeout
-        fileno_to_inq = pool._fileno_to_inq
-        fileno_to_outq = pool._fileno_to_outq
-        fileno_to_synq = pool._fileno_to_synq
-        outbound = self.outbound_buffer
-        pop_message = outbound.popleft
-        put_message = outbound.append
-        all_inqueues = pool._all_inqueues
-        active_writes = self._active_writes
-        diff = all_inqueues.difference
-        hub_add, hub_remove = hub.add, hub.remove
-        mark_write_fd_as_active = active_writes.add
-        mark_write_gen_as_active = self._active_writers.add
-        write_generator_gone = self._active_writers.discard
-        get_job = pool._cache.__getitem__
-        pool._put_back = put_message
-
-        # did_start_ok will verify that pool processes were able to start,
-        # but this will only work the first time we start, as
-        # maxtasksperchild will mess up metrics.
-        if not w.consumer.restart_count and not pool.did_start_ok():
-            raise WorkerLostError('Could not start worker processes')
-
-        hub_add(pool.process_sentinels, self.maintain_pool, READ | ERR)
-        hub_add(fileno_to_outq, self.handle_result_event, READ | ERR)
-        for handler, interval in items(self.timers):
-            hub.timer.apply_interval(interval * 1000.0, handler)
-
-        # need to handle pool results before every task
-        # since multiple tasks can be received in a single poll()
-        # XXX do we need this now?!?
-        # hub.on_task.append(pool.maybe_handle_result)
 
         def on_timeout_set(R, soft, hard):
 
@@ -548,14 +543,24 @@ class TaskPool(BasePool):
                 pass
         self._pool.on_timeout_cancel = on_timeout_cancel
 
+    def _create_process_handlers(self, hub, READ=READ, ERR=ERR):
+        hub_add, hub_remove = hub.add, hub.remove
+        all_inqueues = self._pool._all_inqueues
+        fileno_to_inq = self._pool._fileno_to_inq
+        fileno_to_outq = self._pool._fileno_to_outq
+        fileno_to_synq = self._pool._fileno_to_synq
+        maintain_pool = self._pool.maintain_pool
+        handle_result_event = self._pool.handle_result_event
+        process_flush_queues = self._pool.process_flush_queues
+
         def on_process_up(proc):
             fileno_to_outq[proc.outqR_fd] = proc
             hub_add(proc.sentinel, maintain_pool, READ | ERR)
-            hub_add(proc.outqR_fd, pool.handle_result_event, READ | ERR)
+            hub_add(proc.outqR_fd, handle_result_event, READ | ERR)
         self._pool.on_process_up = on_process_up
 
         def on_process_down(proc):
-            pool.process_flush_queues(proc)
+            process_flush_queues(proc)
             fileno_to_outq.pop(proc.outqR_fd, None)
             fileno_to_inq.pop(proc.inqW_fd, None)
             fileno_to_synq.pop(proc.synqW_fd, None)
@@ -564,74 +569,194 @@ class TaskPool(BasePool):
             hub_remove(proc.outqR_fd)
         self._pool.on_process_down = on_process_down
 
-        class Ack(object):
-            __slots__ = ('id', 'fd', '_payload')
+    def _create_write_handlers(self, hub,
+                               pack=struct.pack, dumps=_pickle.dumps,
+                               protocol=HIGHEST_PROTOCOL):
+        pool = self._pool
+        fileno_to_inq = pool._fileno_to_inq
+        fileno_to_synq = pool._fileno_to_synq
+        outbound = self.outbound_buffer
+        pop_message = outbound.popleft
+        put_message = outbound.append
+        all_inqueues = pool._all_inqueues
+        active_writes = self._active_writes
+        diff = all_inqueues.difference
+        hub_add, hub_remove = hub.add, hub.remove
+        mark_write_fd_as_active = active_writes.add
+        mark_write_gen_as_active = self._active_writers.add
+        write_generator_done = self._active_writers.discard
+        get_job = pool._cache.__getitem__
+        pool._put_back = put_message
+        precalc = {ACK: pool._create_payload(ACK, (0, )),
+                   NACK: pool._create_payload(NACK, (0, ))}
 
-            def __init__(self, id, fd, payload):
-                self.id = id
-                self.fd = fd
-                self._payload = payload
+        def on_poll_start(hub):
+            # called for every eventloop iteration, and if there
+            # are messages pending this will schedule writing one message
+            # by registering the 'schedule_writes' function for all currently
+            # inactive inqueues (not already being written to)
+            if outbound:
+                hub_add(diff(active_writes), schedule_writes, WRITE | ERR)
+        self.on_poll_start = on_poll_start
 
-            def __eq__(self, other):
-                return self.i == other.i
+        def on_inqueue_close(fd):
+            # Makes sure the fd is removed from tracking when
+            # the connection is closed, this is essential as
+            # fds may be reused.
+            active_writes.discard(fd)
+            all_inqueues.discard(fd)
+        self._pool.on_inqueue_close = on_inqueue_close
 
-            def __hash__(self):
-                return self.i
+        def schedule_writes(ready_fd, events):
+            # Schedule write operation to ready file descriptor.
+            # The file descriptor is writeable, but that does not
+            # mean the process is currently reading from the socket.
+            # The socket is buffered so writeable simply means that
+            # the buffer can accept at least 1 byte of data.
+            if ready_fd in active_writes:
+                # alredy writing to this fd
+                return
+            try:
+                job = pop_message()
+            except IndexError:
+                # no more messages, remove all inactive fds from the hub.
+                # this is important since the fds are always writeable
+                # as long as there's 1 byte left in the buffer, and so
+                # this may create a spinloop where the eventloop always wakes
+                # up.
+                for inqfd in diff(active_writes):
+                    hub_remove(inqfd)
+            else:
+                if not job._accepted:  # job not accepted by another worker
+                    callback = promise(write_generator_done)
+                    try:
+                        # keep track of what process the write operation
+                        # was scheduled for.
+                        job._scheduled_for = fileno_to_inq[ready_fd]
+                    except KeyError:
+                        # process gone since scheduled, put it back
+                        return put_message(job)
+                    cor = _write_job(ready_fd, job, callback=callback)
+                    mark_write_gen_as_active(cor)
+                    mark_write_fd_as_active(ready_fd)
+                    callback.args = (cor, )  # tricky as we need to pass ref
 
-        def _write_ack(fd, ack, callback=None):
-            header, body, body_size = ack._payload
+                    # Try to write immediately, in case there's an error.
+                    try:
+                        next(cor)
+                        hub_add((ready_fd, ), cor, WRITE | ERR)
+                    except StopIteration:
+                        pass
+
+        def send_job(tup):
+            # Schedule writing job request for when one of the process
+            # inqueues are writable.
+            body = dumps(tup, protocol=protocol)
+            body_size = len(body)
+            header = pack('>I', body_size)
+            # index 1,0 is the job ID.
+            job = get_job(tup[1][0])
+            job._payload = header, buffer(body), body_size
+            put_message(job)
+        self._pool._quick_put = send_job
+
+        write_stats = self.write_stats = Counter()
+
+        def on_not_recovering(proc):
+            # XXX Theoretically a possibility, but maybe terminate the
+            # process in this case to attempt to recover that way.
+            print('<<<<<<<<<< PROCESS IS NOT RECOVERING :(')
+            raise Exception('Contact support' % (proc, ))
+
+        def _write_job(fd, job, callback=None):
+            # writes job to the worker process.
+            # Operation must complete if more than one byte of data
+            # was written.  If the broker connection is lost
+            # and no data was written the operation shall be cancelled.
+            header, body, body_size = job._payload
+            errors = 0
             try:
                 try:
-                    proc = fileno_to_synq[fd]
+                    proc = fileno_to_inq[fd]
                 except KeyError:
+                    # write was scheduled for this fd but the process has since
+                    # exited, the message must be sent to another process.
+                    put_message(job)
                     raise StopIteration()
-                send_offset = proc.synq._writer.send_offset
+                # job result keeps track of what process the job is sent to.
+                job._write_to = proc
+                send = proc.send_job_offset
 
                 Hw = Bw = 0
                 while Hw < 4:
                     try:
-                        Hw += send_offset(header, Hw)
+                        Hw += send(header, Hw)
                     except Exception as exc:
                         if get_errno(exc) not in UNAVAIL:
                             raise
+                        # suspend until more data
+                        errors += 1
+                        if errors > 100:
+                            on_not_recovering(proc)
+                            raise StopIteration()
                         yield
+                    errors = 0
                 while Bw < body_size:
                     try:
-                        Bw += send_offset(body, Bw)
+                        Bw += send(body, Bw)
                     except Exception as exc:
                         if get_errno(exc) not in UNAVAIL:
                             raise
                         # suspend until more data
+                        errors += 1
+                        if errors > 100:
+                            on_not_recovering(proc)
+                            raise StopIteration()
                         yield
+                    errors = 0
             finally:
                 if callback:
                     callback()
+                write_stats[proc.index] += 1
+                # message written, so this fd is now available
                 active_writes.discard(fd)
 
-        def _write_job(fd, job, callback=None):
-            header, body, body_size = job._payload
+        def send_ack(response, pid, job, fd, WRITE=WRITE, ERR=ERR):
+            # Schedule writing ack response for when the fd is writeable.
+            msg = Ack(job, fd, precalc[response])
+            callback = promise(write_generator_done)
+            cor = _write_ack(fd, msg, callback=callback)
+            mark_write_gen_as_active(cor)
+            mark_write_fd_as_active(fd)
+            callback.args = (cor, )
+            hub_add((fd, ), cor, WRITE | ERR)
+        self._pool.send_ack = send_ack
+
+        def _write_ack(fd, ack, callback=None):
+            # writes ack back to the worker if synack enabled.
+            # this operation *MUST* complete, otherwise
+            # the worker process will hang waiting for the ack.
+            header, body, body_size = ack[2]
             try:
                 try:
-                    proc = fileno_to_inq[fd]
+                    proc = fileno_to_synq[fd]
                 except KeyError:
-                    put_message(job)
+                    # process died, we can safely discard the ack at this
+                    # point.
                     raise StopIteration()
-                # job result keeps track of what process the job is sent to.
-                job._write_to = proc
-                send_offset = proc.inq._writer.send_offset
+                send = proc.send_syn_offset
 
                 Hw = Bw = 0
                 while Hw < 4:
                     try:
-                        Hw += send_offset(header, Hw)
+                        Hw += send(header, Hw)
                     except Exception as exc:
                         if get_errno(exc) not in UNAVAIL:
                             raise
-                        # suspend until more data
                         yield
                 while Bw < body_size:
                     try:
-                        Bw += send_offset(body, Bw)
+                        Bw += send(body, Bw)
                     except Exception as exc:
                         if get_errno(exc) not in UNAVAIL:
                             raise
@@ -640,75 +765,35 @@ class TaskPool(BasePool):
             finally:
                 if callback:
                     callback()
+                # message written, so this fd is now available
                 active_writes.discard(fd)
 
-        def on_inqueue_close(fd):
-            active_writes.discard(fd)
-            all_inqueues.discard(fd)
-        self._pool.on_inqueue_close = on_inqueue_close
-
-        def schedule_writes(ready_fd, events):
-            if ready_fd in active_writes:
-                return
-            try:
-                job = pop_message()
-            except IndexError:
-                for inqfd in diff(active_writes):
-                    hub_remove(inqfd)
-            else:
-                if not job._accepted:
-                    callback = promise(write_generator_gone)
-                    try:
-                        job._scheduled_for = fileno_to_inq[ready_fd]
-                    except KeyError:
-                        # process gone since scheduled, put back
-                        return put_message(job)
-                    cor = _write_job(ready_fd, job, callback=callback)
-                    mark_write_gen_as_active(cor)
-                    mark_write_fd_as_active(ready_fd)
-                    callback.args = (cor, )  # tricky as we need to pass ref
-                    hub_add((ready_fd, ), cor, WRITE | ERR)
-
-        def _create_payload(type_, args):
-            body = dumps((type_, args), protocol=protocol)
-            size = len(body)
-            header = pack('>I', size)
-            return header, body, size
-
-        MESSAGES = {ACK: _create_payload(ACK, (0, )),
-                    NACK: _create_payload(NACK, (0, ))}
+    def on_poll_init(self, w, hub):
+        pool = self._pool
 
-        def send_ack(response, pid, job, fd):
-            msg = Ack(job, fd, MESSAGES[response])
-            callback = promise(write_generator_gone)
-            cor = _write_ack(fd, msg, callback=callback)
-            mark_write_gen_as_active(cor)
-            mark_write_fd_as_active(fd)
-            callback.args = (cor, )
-            hub_add((fd, ), cor, WRITE | ERR)
-        self._pool.send_ack = send_ack
+        self._create_timelimit_handlers(hub)
+        self._create_process_handlers(hub)
+        self._create_write_handlers(hub)
 
-        def on_poll_start(hub):
-            if outbound:
-                hub_add(diff(active_writes), schedule_writes, WRITE | ERR)
-        self.on_poll_start = on_poll_start
+        # did_start_ok will verify that pool processes were able to start,
+        # but this will only work the first time we start, as
+        # maxtasksperchild will mess up metrics.
+        if not w.consumer.restart_count and not pool.did_start_ok():
+            raise WorkerLostError('Could not start worker processes')
 
-        def quick_put(tup):
-            body = dumps(tup, protocol=protocol)
-            body_size = len(body)
-            header = pack('>I', body_size)
-            # index 1,0 is the job ID.
-            job = get_job(tup[1][0])
-            job._payload = header, buffer(body), body_size
-            put_message(job)
-        self._pool._quick_put = quick_put
+        # Maintain_pool is called whenever a process exits.
+        hub.add(pool.process_sentinels, self.maintain_pool, READ | ERR)
+        # Handle_result_event is called whenever one of the
+        # result queues are readable.
+        hub.add(pool._fileno_to_outq, self.handle_result_event, READ | ERR)
 
-    def handle_timeouts(self):
-        if self._pool._timeout_handler:
-            self._pool._timeout_handler.handle_event()
+        # Timers include calling maintain_pool at a regular interval
+        # to be certain processes are restarted.
+        for handler, interval in items(self.timers):
+            hub.timer.apply_interval(interval * 1000.0, handler)
 
     def flush(self):
-        # cancel all tasks that have not been accepted to that NACK is sent.
+        # cancel all tasks that have not been accepted so that NACK is sent.
         for job in values(self._pool._cache):
             if not job._accepted:
                 job._cancel()
@@ -718,9 +803,9 @@ class TaskPool(BasePool):
         if self.outbound_buffer:
             self.outbound_buffer.clear()
         try:
-            # but we must continue to write the payloads we already started
-            # otherwise we will not recover the message boundaries.
-            # the messages will be NACK'ed later.
+            # ...but we must continue writing the payloads we already started
+            # to keep message boundaries.
+            # The messages may be NACK'ed later if synack is enabled.
             if self._pool._state == RUN:
                 # flush outgoing buffers
                 intervals = fxrange(0.01, 0.1, 0.01, repeatlast=True)