Ver Fonte

Fixes -Ofair problems (Closes #2286)

Ask Solem há 10 anos atrás
pai
commit
d25e097acf
1 ficheiros alterados com 9 adições e 5 exclusões
  1. 9 5
      celery/concurrency/asynpool.py

+ 9 - 5
celery/concurrency/asynpool.py

@@ -596,7 +596,8 @@ class AsynPool(_pool.Pool):
         fileno_to_synq = self._fileno_to_synq
         outbound = self.outbound_buffer
         pop_message = outbound.popleft
-        put_message = outbound.append
+        append_message = outbound.append
+        put_back_message = outbound.appendleft
         all_inqueues = self._all_inqueues
         active_writes = self._active_writes
         active_writers = self._active_writers
@@ -693,12 +694,15 @@ class AsynPool(_pool.Pool):
                 ready_fd = ready_fds[curindex[0] % total]
                 if ready_fd in active_writes:
                     # already writing to this fd
+                    curindex[0] += 1
                     continue
                 if is_fair_strategy and ready_fd in busy_workers:
                     # worker is already busy with another task
+                    curindex[0] += 1
                     continue
                 if ready_fd not in all_inqueues:
                     hub_remove(ready_fd)
+                    curindex[0] += 1
                     continue
                 try:
                     job = pop_message()
@@ -711,7 +715,6 @@ class AsynPool(_pool.Pool):
                     for inqfd in diff(active_writes):
                         hub_remove(inqfd)
                     break
-
                 else:
                     if not job._accepted:  # job not accepted by another worker
                         try:
@@ -722,9 +725,9 @@ class AsynPool(_pool.Pool):
                             # write was scheduled for this fd but the process
                             # has since exited and the message must be sent to
                             # another process.
-                            put_message(job)
+                            put_back_message(job)
+                            curindex[0] += 1
                             continue
-                        curindex[0] += 1
                         cor = _write_job(proc, ready_fd, job)
                         job._writer = ref(cor)
                         mark_write_gen_as_active(cor)
@@ -741,6 +744,7 @@ class AsynPool(_pool.Pool):
                                 raise
                         else:
                             add_writer(ready_fd, cor)
+                curindex[0] += 1
         hub.consolidate_callback = schedule_writes
 
         def send_job(tup):
@@ -752,7 +756,7 @@ class AsynPool(_pool.Pool):
             # index 1,0 is the job ID.
             job = get_job(tup[1][0])
             job._payload = buf_t(header), buf_t(body), body_size
-            put_message(job)
+            append_message(job)
         self._quick_put = send_job
 
         def on_not_recovering(proc, fd, job):