Browse Source

Prefork: Use i % total to cycle between ready_fds instead of shuffle

Ask Solem 10 years ago
parent
commit
e2f05fe02c
1 changed files with 14 additions and 3 deletions
  1. 14 3
      celery/concurrency/asynpool.py

+ 14 - 3
celery/concurrency/asynpool.py

@@ -668,14 +668,25 @@ class AsynPool(_pool.Pool):
                 pass
         self.on_inqueue_close = on_inqueue_close
 
-        def schedule_writes(ready_fds, shuffle=random.shuffle):
+        def schedule_writes(ready_fds, shuffle=random.shuffle, curindex=[0]):
             # Schedule write operation to ready file descriptor.
             # The file descriptor is writeable, but that does not
             # mean the process is currently reading from the socket.
             # The socket is buffered so writeable simply means that
             # the buffer can accept at least 1 byte of data.
-            shuffle(ready_fds)
-            for ready_fd in ready_fds:
+
+            # This means we have to cycle between the ready fds.
+            # the first version used shuffle, but using i % total
+            # is about 30% faster with many processes.  The latter
+            # also shows more fairness in write stats when used with
+            # many processes [XXX On OS X, this may vary depending
+            # on event loop implementation (i.e select vs epoll), so
+            # have to test further]
+            total = len(ready_fds)
+
+            for i in range(total):
+                ready_fd = ready_fds[curindex[0] % total]
+                curindex[0] += 1
                 if ready_fd in active_writes:
                     # already writing to this fd
                     continue