Ask Solem 12 vuotta sitten
vanhempi
commit
b9fac36ce7
3 muutettua tiedostoa jossa 15 lisäystä ja 44 poistoa
  1. 1 2
      celery/concurrency/processes.py
  2. 0 38
      celery/worker/hub.py
  3. 14 4
      celery/worker/loops.py

+ 1 - 2
celery/concurrency/processes.py

@@ -435,7 +435,6 @@ class TaskPool(BasePool):
         fileno_to_outq = pool._fileno_to_outq
         all_inqueues = pool._all_inqueues
         active_writes = self._active_writes
-        add_coro = hub.add_coro
         diff = all_inqueues.difference
         hub_add, hub_remove = hub.add, hub.remove
         mark_write_fd_as_active = active_writes.add
@@ -543,7 +542,7 @@ class TaskPool(BasePool):
                     mark_write_gen_as_active(cor)
                     mark_write_fd_as_active(ready_fd)
                     callback.args = (cor, )  # tricky as we need to pass ref
-                    add_coro((ready_fd, ), cor, WRITE)
+                    hub_add((ready_fd, ), cor, WRITE)
 
         def on_poll_start(hub):
             if outbound:

+ 0 - 38
celery/worker/hub.py

@@ -160,33 +160,6 @@ class Hub(object):
         self.on_init = []
         self.on_close = []
         self.on_task = []
-        self.coros = {}
-
-        self.trampoline = self._trampoline()
-
-    @coroutine
-    def _trampoline(self):
-        coros = self.coros
-        add = self.add_coro
-        remove_self = self.remove
-        pop = self.coros.pop
-
-        while 1:
-            fd, events = (yield)
-            remove_self(fd)
-            try:
-                gen = coros[fd]
-            except KeyError:
-                pass
-            else:
-                try:
-                    next(gen)
-                    add(fd, gen, WRITE)
-                except StopIteration:
-                    pop(fd, None)
-                except Exception:
-                    pop(fd, None)
-                    raise
 
     def start(self):
         """Called by Hub bootstep at worker startup."""
@@ -232,15 +205,6 @@ class Hub(object):
         self._unregister(fd)
         self._discard(fd)
 
-    def add_coro(self, fds, coro, flags):
-        for fd in (fileno(f) for f in maybe_list(fds, None)):
-            self._add(fd, self.trampoline, flags)
-            self.coros[fd] = coro
-
-    def remove_coro(self, fds):
-        for fd in maybe_list(fds, None):
-            self.coros.pop(fileno(fd), None)
-
     def add_reader(self, fds, callback):
         return self.add(fds, callback, READ | ERR)
 
@@ -299,8 +263,6 @@ class Hub(object):
 
     def _callback_for(self, fd, flag, *default):
         try:
-            if fd in self.coros:
-                return self.coros[fd]
             if flag & READ:
                 return self.readers[fileno(fd)]
             if flag & WRITE:

+ 14 - 4
celery/worker/loops.py

@@ -34,6 +34,8 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
         readers, writers = hub.readers, hub.writers
         poll = hub.poller.poll
         fire_timers = hub.fire_timers
+        hub_add = hub.add
+        hub_remove = hub.remove
         scheduled = hub.timer._queue
         hbtick = connection.heartbeat_check
         conn_poll_start = connection.transport.on_poll_start
@@ -98,18 +100,26 @@ def asynloop(obj, connection, consumer, strategies, ns, hub, qos,
                     if not events:
                         conn_poll_empty()
                     for fileno, event in events or ():
+                        cb = flags = None
                         try:
                             if event & READ:
-                                cb = readers[fileno]
+                                cb, flags = readers[fileno], READ | ERR
                             elif event & WRITE:
-                                cb = writers[fileno]
+                                cb, flags = writers[fileno], WRITE
                             elif event & ERR:
-                                cb = (readers.get(fileno) or
+                                cb, flags = (readers.get(fileno) or
                                       writers.get(fileno))
                                 if cb is None:
                                     continue
                             if isinstance(cb, generator):
-                                cb.send((fileno, event))
+                                try:
+                                    next(cb)
+                                    hub_add(fileno, cb, WRITE)
+                                except StopIteration:
+                                    hub_remove(fileno)
+                                except Exception:
+                                    hub_remove(fileno)
+                                    raise
                             else:
                                 cb(fileno, event)
                         except (KeyError, Empty):