Browse Source

Redis transport now also uses the evented interface.

The redis consumer already uses eventio, so just had too
hook it into our hub.

Depends on:

    - ask/kombu@ffa1dd35b02807734fadf80a6934bb7a64454eb6
    - ask/billiard@87f5817f5444d163faf09d966d94abb4c80e87f3
Ask Solem 13 years ago
parent
commit
3fe715d357
3 changed files with 25 additions and 11 deletions
  1. 1 1
      celery/worker/__init__.py
  2. 17 6
      celery/worker/consumer.py
  3. 7 4
      celery/worker/hub.py

+ 1 - 1
celery/worker/__init__.py

@@ -89,7 +89,7 @@ class Pool(abstract.StartStopComponent):
         if w.autoscale:
             w.max_concurrency, w.min_concurrency = w.autoscale
         w.use_eventloop = (detect_environment() == "default" and
-                           w.app.broker_connection().eventmap)
+                           w.app.broker_connection().is_evented)
 
     def create(self, w):
         forking_enable(w.no_execv or not w.force_execv)

+ 17 - 6
celery/worker/consumer.py

@@ -80,6 +80,8 @@ import logging
 import socket
 import threading
 
+from Queue import Empty
+
 from kombu.utils.encoding import safe_repr
 
 from celery.app import app_or_default
@@ -384,11 +386,16 @@ class Consumer(object):
 
     def _eventloop(self):
         """Consume messages forever (or until an exception is raised)."""
+        on_poll_start = self.connection.transport.on_poll_start
+
+        qos = self.qos
         with Hub() as hub:
-            hub.update(self.connection.eventmap,
-                       self.pool.eventmap)
+            update = hub.update
             fdmap = hub.fdmap
             poll = hub.poller.poll
+            update(self.connection.eventmap,
+                       self.pool.eventmap)
+            self.connection.transport.on_poll_init(hub.poller)
 
             while self._state != CLOSE and self.connection:
                 if state.should_stop:
@@ -397,11 +404,15 @@ class Consumer(object):
                     raise SystemTerminate()
                 if not fdmap:
                     return
-                if self.qos.prev != self.qos.value:     # pragma: no cover
-                    self.qos.update()
-                for fileno, event in poll(1.0) or ():
+                if qos.prev != qos.value:     # pragma: no cover
+                    qos.update()
+
+                update(on_poll_start())
+                for fileno, event in poll(100.0) or ():
                     try:
-                        fdmap[fileno]()
+                        fdmap[fileno](fileno, event)
+                    except Empty:
+                        pass
                     except socket.error:
                         if self._state != CLOSE:        # pragma: no cover
                             raise

+ 7 - 4
celery/worker/hub.py

@@ -21,11 +21,14 @@ class Hub(object):
     def __exit__(self, *exc_info):
         return self.close()
 
-    def add(self, f, callback, flags=None):
+    def add(self, fd, callback, flags=None):
         flags = self.eventflags if flags is None else flags
-        fd = f.fileno()
         self.poller.register(fd, flags)
-        self.fdmap[fd] = callback
+        try:
+            fileno = fd.fileno()
+        except AttributeError:
+            fileno = fd
+        self.fdmap[fileno] = callback
 
     def update(self, *maps):
         [self.add(*x) for row in maps for x in row.iteritems()]
@@ -41,7 +44,7 @@ class Hub(object):
             return sleep(0.1)
         for fileno, event in self.poller.poll(timeout) or ():
             try:
-                self.fdmap[fileno]()
+                self.fdmap[fileno](fileno, event)
             except socket.timeout:
                 pass
             except socket.error, exc: