Bläddra i källkod

Multiprocessing: Randomize writes so that a single process does not get all tasks while there's more space in the pipe buffer

Ask Solem 11 år sedan
förälder
incheckning
e61e0a2627
4 ändrade filer med 70 tillägg och 41 borttagningar
  1. 53 37
      celery/concurrency/processes.py
  2. 7 3
      celery/worker/hub.py
  3. 9 0
      celery/worker/loops.py
  4. 1 1
      funtests/stress/stress/app.py

+ 53 - 37
celery/concurrency/processes.py

@@ -19,6 +19,7 @@ from __future__ import absolute_import
 
 import errno
 import os
+import random
 import select
 import socket
 import struct
@@ -786,8 +787,17 @@ class TaskPool(BasePool):
             # are messages pending this will schedule writing one message
             # by registering the 'schedule_writes' function for all currently
             # inactive inqueues (not already being written to)
+
+            # consolidate means the eventloop will merge them
+            # and call the callback once with the list writable fds as
+            # argument.  Using this means we minimize the risk of having
+            # the same fd receive every task if the pipe read buffer is not
+            # full.
             if outbound:
-                hub_add(diff(active_writes), schedule_writes, WRITE | ERR)
+                hub_add(
+                    diff(active_writes), None, WRITE | ERR,
+                    consolidate=True,
+                )
         self.on_poll_start = on_poll_start
 
         def on_inqueue_close(fd):
@@ -797,47 +807,53 @@ class TaskPool(BasePool):
             all_inqueues.discard(fd)
         self._pool.on_inqueue_close = on_inqueue_close
 
-        def schedule_writes(ready_fd, events):
+        def schedule_writes(ready_fds, shuffle=random.shuffle):
             # Schedule write operation to ready file descriptor.
             # The file descriptor is writeable, but that does not
             # mean the process is currently reading from the socket.
             # The socket is buffered so writeable simply means that
             # the buffer can accept at least 1 byte of data.
-            if ready_fd in active_writes:
-                # already writing to this fd
-                return
-            try:
-                job = pop_message()
-            except IndexError:
-                # no more messages, remove all inactive fds from the hub.
-                # this is important since the fds are always writeable
-                # as long as there's 1 byte left in the buffer, and so
-                # this may create a spinloop where the eventloop always wakes
-                # up.
-                for inqfd in diff(active_writes):
-                    hub_remove(inqfd)
-            else:
-                if not job._accepted:  # job not accepted by another worker
-                    try:
-                        # keep track of what process the write operation
-                        # was scheduled for.
-                        proc = job._scheduled_for = fileno_to_inq[ready_fd]
-                    except KeyError:
-                        # write was scheduled for this fd but the process
-                        # has since exited and the message must be sent to
-                        # another process.
-                        return put_message(job)
-                    cor = _write_job(proc, ready_fd, job)
-                    job._writer = ref(cor)
-                    mark_write_gen_as_active(cor)
-                    mark_write_fd_as_active(ready_fd)
-
-                    # Try to write immediately, in case there's an error.
-                    try:
-                        next(cor)
-                        hub_add((ready_fd, ), cor, WRITE | ERR)
-                    except StopIteration:
-                        pass
+            shuffle(ready_fds)
+            for ready_fd in ready_fds:
+                if ready_fd in active_writes:
+                    # already writing to this fd
+                    continue
+                try:
+                    job = pop_message()
+                except IndexError:
+                    # no more messages, remove all inactive fds from the hub.
+                    # this is important since the fds are always writeable
+                    # as long as there's 1 byte left in the buffer, and so
+                    # this may create a spinloop where the eventloop
+                    # always wakes up.
+                    for inqfd in diff(active_writes):
+                        hub_remove(inqfd)
+                    break
+
+                else:
+                    if not job._accepted:  # job not accepted by another worker
+                        try:
+                            # keep track of what process the write operation
+                            # was scheduled for.
+                            proc = job._scheduled_for = fileno_to_inq[ready_fd]
+                        except KeyError:
+                            # write was scheduled for this fd but the process
+                            # has since exited and the message must be sent to
+                            # another process.
+                            put_message(job)
+                            continue
+                        cor = _write_job(proc, ready_fd, job)
+                        job._writer = ref(cor)
+                        mark_write_gen_as_active(cor)
+                        mark_write_fd_as_active(ready_fd)
+
+                        # Try to write immediately, in case there's an error.
+                        try:
+                            next(cor)
+                            hub_add((ready_fd, ), cor, WRITE | ERR)
+                        except StopIteration:
+                            pass
+        hub.consolidate_callback = schedule_writes
 
         def send_job(tup):
             # Schedule writing job request for when one of the process

+ 7 - 3
celery/worker/hub.py

@@ -149,6 +149,7 @@ class Hub(object):
         self.on_init = []
         self.on_close = []
         self.on_task = []
+        self.consolidate = set()
 
     def start(self):
         """Called by Hub bootstep at worker startup."""
@@ -178,18 +179,20 @@ class Hub(object):
                     logger.error('Error in timer: %r', exc, exc_info=1)
         return min(max(delay or 0, min_delay), max_delay)
 
-    def _add(self, fd, cb, flags):
+    def _add(self, fd, cb, flags, consolidate=False):
         #if flags & WRITE:
         #    ex = self.writers.get(fd)
         #    if ex and ex.__name__ == '_write_job':
         #        assert not ex.gi_frame or ex.gi_frame == -1
         self.poller.register(fd, flags)
         (self.readers if flags & READ else self.writers)[fileno(fd)] = cb
+        if consolidate:
+            self.consolidate.add(fd)
 
-    def add(self, fds, callback, flags):
+    def add(self, fds, callback, flags, consolidate=False):
         for fd in maybe_list(fds, None):
             try:
-                self._add(fd, callback, flags)
+                self._add(fd, callback, flags, consolidate)
             except ValueError:
                 self._discard(fd)
 
@@ -220,6 +223,7 @@ class Hub(object):
         fd = fileno(fd)
         self.readers.pop(fd, None)
         self.writers.pop(fd, None)
+        self.consolidate.discard(fd)
 
     def close(self, *args):
         [self._unregister(fd) for fd in self.readers]

+ 9 - 0
celery/worker/loops.py

@@ -49,6 +49,8 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
     keep_draining = connection.transport.nb_keep_draining
     errors = connection.connection_errors
     hub_add, hub_remove = hub.add, hub.remove
+    consolidate = hub.consolidate
+    consolidate_callback = hub.consolidate_callback
 
     on_task_received = obj.create_task_handler(on_task_callbacks)
 
@@ -83,6 +85,7 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
             update_readers(conn_poll_start())
             pool_poll_start(hub)
             if readers or writers:
+                to_consolidate = []
                 connection.more_to_read = True
                 while connection.more_to_read:
                     try:
@@ -94,6 +97,10 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
                     if not events:
                         conn_poll_empty()
                     for fileno, event in events or ():
+                        if fileno in consolidate and \
+                                writers.get(fileno) is None:
+                            to_consolidate.append(fileno)
+                            continue
                         cb = None
                         try:
                             if event & READ:
@@ -124,6 +131,8 @@ def asynloop(obj, connection, consumer, blueprint, hub, qos,
                         except socket.error:
                             if blueprint.state != CLOSE:  # pragma: no cover
                                 raise
+                    if to_consolidate:
+                        consolidate_callback(to_consolidate)
                     if keep_draining:
                         drain_nowait()
                         poll_timeout = 0

+ 1 - 1
funtests/stress/stress/app.py

@@ -59,7 +59,7 @@ def kill(sig=signal.SIGKILL):
 
 
 @app.task
-def sleeping(i):
+def sleeping(i, **_):
     sleep(i)