Browse Source

schedule_jobs should get the proc from index

Ask Solem 12 years ago
parent
commit
e471b1fcde
1 changed files with 9 additions and 11 deletions
  1. 9 11
      celery/concurrency/processes.py

+ 9 - 11
celery/concurrency/processes.py

@@ -611,7 +611,6 @@ class TaskPool(BasePool):
             # are messages pending this will schedule writing one message
             # by registering the 'schedule_writes' function for all currently
             # inactive inqueues (not already being written to)
-            print('NUMJOBS: %r' % (len(self._pool._cache), ))
             if outbound:
                 hub_add(diff(active_writes), schedule_writes, WRITE | ERR)
         self.on_poll_start = on_poll_start
@@ -631,7 +630,7 @@ class TaskPool(BasePool):
             # The socket is buffered so writeable simply means that
             # the buffer can accept at least 1 byte of data.
             if ready_fd in active_writes:
-                # alredy writing to this fd
+                # already writing to this fd
                 return
             try:
                 job = pop_message()
@@ -652,7 +651,13 @@ class TaskPool(BasePool):
                     except KeyError:
                         # process gone since scheduled, put it back
                         return put_message(job)
-                    cor = _write_job(ready_fd, job)
+                    try:
+                        proc = fileno_to_inq[ready_fd]
+                    except KeyError:
+                        # write was scheduled for this fd but the process has since
+                        # exited and the message must be sent to another process.
+                        return put_message(job)
+                    cor = _write_job(proc, ready_fd, job)
                     job._writer = ref(cor)
                     mark_write_gen_as_active(cor)
                     mark_write_fd_as_active(ready_fd)
@@ -684,7 +689,7 @@ class TaskPool(BasePool):
             raise Exception(
                 'Process writable but cannot write. Contact support!')
 
-        def _write_job(fd, job):
+        def _write_job(proc, fd, job):
             # writes job to the worker process.
             # Operation must complete if more than one byte of data
             # was written.  If the broker connection is lost
@@ -692,13 +697,6 @@ class TaskPool(BasePool):
             header, body, body_size = job._payload
             errors = 0
             try:
-                try:
-                    proc = fileno_to_inq[fd]
-                except KeyError:
-                    # write was scheduled for this fd but the process has since
-                    # exited, the message must be sent to another process.
-                    put_message(job)
-                    raise StopIteration()
                 # job result keeps track of what process the job is sent to.
                 job._write_to = proc
                 send = proc.send_job_offset