Ask Solem 11 年 前
コミット
f06fe0adad
1 ファイル変更10 行追加9 行削除
  1. 10 9
      celery/concurrency/asynpool.py

+ 10 - 9
celery/concurrency/asynpool.py

@@ -667,7 +667,7 @@ class AsynPool(_pool.Pool):
                 pass
                 pass
         self.on_inqueue_close = on_inqueue_close
         self.on_inqueue_close = on_inqueue_close
 
 
-        def schedule_writes(ready_fds, curindex=[0]):
+        def schedule_writes(ready_fds, total_write_count=[0]):
             # Schedule write operation to ready file descriptor.
             # Schedule write operation to ready file descriptor.
             # The file descriptor is writeable, but that does not
             # The file descriptor is writeable, but that does not
             # mean the process is currently reading from the socket.
             # mean the process is currently reading from the socket.
@@ -675,17 +675,18 @@ class AsynPool(_pool.Pool):
             # the buffer can accept at least 1 byte of data.
             # the buffer can accept at least 1 byte of data.
 
 
             # This means we have to cycle between the ready fds.
             # This means we have to cycle between the ready fds.
-            # the first version used shuffle, but using i % total
+            # the first version used shuffle, but this version
-            # is about 30% faster with many processes.  The latter
+            # using `total_writes % ready_fds` is about 30% faster
-            # also shows more fairness in write stats when used with
+            # with many processes, and also leans more towards fairness
-            # many processes [XXX On OS X, this may vary depending
+            # in write stats when used with many processes
+            # [XXX On OS X, this may vary depending
             # on event loop implementation (i.e select vs epoll), so
             # on event loop implementation (i.e select vs epoll), so
             # have to test further]
             # have to test further]
-            total = len(ready_fds)
+            num_ready = len(ready_fds)
 
 
-            for i in range(total):
+            for i in range(num_ready):
-                ready_fd = ready_fds[curindex[0] % total]
+                ready_fd = ready_fds[total_write_count[0] % num_ready]
-                curindex[0] += 1
+                total_write_count[0] += 1
                 if ready_fd in active_writes:
                 if ready_fd in active_writes:
                     # already writing to this fd
                     # already writing to this fd
                     continue
                     continue