Ver código fonte

Fixes bug with on_partial_read, also disablees synack by default

Ask Solem 12 anos atrás
pai
commit
d18b525e63
1 arquivos alterados com 12 adições e 9 exclusões
  1. 12 9
      celery/concurrency/processes.py

+ 12 - 9
celery/concurrency/processes.py

@@ -25,7 +25,7 @@ from billiard import forking_enable
 from billiard import pool as _pool
 from billiard.exceptions import WorkerLostError
 from billiard.pool import (
-    RUN, CLOSE, TERMINATE, ACK, NACK, WorkersJoined, CoroStop,
+    RUN, CLOSE, TERMINATE, ACK, NACK, EX_RECYCLE, WorkersJoined, CoroStop,
 )
 from billiard.queues import _SimpleQueue
 from kombu.serialization import pickle as _pickle
@@ -234,8 +234,9 @@ class AsynPool(_pool.Pool):
     ResultHandler = ResultHandler
     Worker = Worker
 
-    def __init__(self, processes=None, *args, **kwargs):
+    def __init__(self, processes=None, synack=False, *args, **kwargs):
         processes = self.cpu_count() if processes is None else processes
+        self.synack = synack
         self._queues = dict((self.create_process_queues(), None)
                             for _ in range(processes))
         self._fileno_to_inq = {}
@@ -258,9 +259,11 @@ class AsynPool(_pool.Pool):
                     if owner is None)
 
     def create_process_queues(self):
-        inq, outq, synq = _SimpleQueue(), _SimpleQueue(), _SimpleQueue()
+        inq, outq, synq = _SimpleQueue(), _SimpleQueue(), None
         inq._writer.setblocking(0)
-        synq._writer.setblocking(0)
+        if self.synack:
+            synq = _SimpleQueue()
+            synq._writer.setblocking(0)
         return inq, outq, synq
 
     def on_worker_alive(self, pid):
@@ -273,7 +276,7 @@ class AsynPool(_pool.Pool):
         self._all_inqueues.add(proc.inqW_fd)
 
     def on_job_process_down(self, job):
-        if not job._accepted and job._write_to:
+        if job._write_to:
             self.on_partial_read(job, job._write_to)
 
     def on_job_process_lost(self, job, pid, exitcode):
@@ -321,7 +324,7 @@ class AsynPool(_pool.Pool):
         # we cannot reuse the sockets again, because we don't know if
         # the process wrote/read anything frmo them, and if so we cannot
         # restore the message boundaries.
-        if proc.exitcode < 0:
+        if proc.exitcode != EX_RECYCLE:
             # job was not acked, so find another worker to send it to.
             if not job._accepted:
                 self._put_back(job)
@@ -386,7 +389,7 @@ class TaskPool(BasePool):
                 else self.Pool)
         P = self._pool = Pool(processes=self.limit,
                               initializer=process_initializer,
-                              synack=True,
+                              synack=False,
                               **self.options)
         self.on_apply = P.apply_async
         self.on_soft_timeout = P._timeout_handler.on_soft_timeout
@@ -611,8 +614,8 @@ class TaskPool(BasePool):
         MESSAGES = {ACK: _create_payload(ACK, (0, )),
                     NACK: _create_payload(NACK, (0, ))}
 
-        def send_ack(response, pid, i, fd):
-            msg = Ack(i, fd, MESSAGES[response])
+        def send_ack(response, pid, job, fd):
+            msg = Ack(job, fd, MESSAGES[response])
             callback = promise(write_generator_gone)
             cor = _write_ack(fd, msg, callback=callback)
             mark_write_gen_as_active(cor)