Ask Solem 12 роки тому
батько
коміт
ab0550818d
2 змінених файлів з 4 додано та 11 видалено
  1. 2 4
      celery/bootsteps.py
  2. 2 7
      celery/concurrency/processes.py

+ 2 - 4
celery/bootsteps.py

@@ -208,9 +208,7 @@ class Blueprint(object):
         return self.steps[name]
         return self.steps[name]
 
 
     def _find_last(self):
     def _find_last(self):
-        for C in values(self.steps):
-            if C.last:
-                return C
+        return next((C for C in values(self.steps) if C.last), None)
 
 
     def _firstpass(self, steps):
     def _firstpass(self, steps):
         stream = deque(step.requires for step in values(steps))
         stream = deque(step.requires for step in values(steps))
@@ -312,7 +310,7 @@ class Step(object):
         pass
         pass
 
 
     def include_if(self, parent):
     def include_if(self, parent):
-        """An optional predicate that decided whether this
+        """An optional predicate that decides whether this
         step should be created."""
         step should be created."""
         return self.enabled
         return self.enabled
 
 

+ 2 - 7
celery/concurrency/processes.py

@@ -162,7 +162,7 @@ class ResultHandler(_pool.ResultHandler):
                 else:
                 else:
                     ready, task = False, None
                     ready, task = False, None
             except (IOError, EOFError) as exc:
             except (IOError, EOFError) as exc:
-                debug('result handler got %r -- exiting' % (exc, ))
+                debug('result handler got %r -- exiting', exc)
                 raise CoroStop()
                 raise CoroStop()
 
 
             if self._state:
             if self._state:
@@ -646,12 +646,7 @@ class TaskPool(BasePool):
                     try:
                     try:
                         # keep track of what process the write operation
                         # keep track of what process the write operation
                         # was scheduled for.
                         # was scheduled for.
-                        job._scheduled_for = fileno_to_inq[ready_fd]
-                    except KeyError:
-                        # process gone since scheduled, put it back
-                        return put_message(job)
-                    try:
-                        proc = fileno_to_inq[ready_fd]
+                        proc = job._scheduled_for = fileno_to_inq[ready_fd]
                     except KeyError:
                     except KeyError:
                         # write was scheduled for this fd but the process
                         # write was scheduled for this fd but the process
                         # has since exited and the message must be sent to
                         # has since exited and the message must be sent to