Browse Source

Eventloop: Buffer when messages come close together

Ask Solem 13 years ago
parent
commit
16dcb2a192
1 changed files with 60 additions and 19 deletions
  1. 60 19
      celery/worker/consumer.py

+ 60 - 19
celery/worker/consumer.py

@@ -172,7 +172,8 @@ class Component(StartStopComponent):
                 pool=w.pool,
                 timer=w.timer,
                 app=w.app,
-                controller=w)
+                controller=w,
+                use_eventloop=w.use_eventloop)
         return c
 
 
@@ -303,7 +304,7 @@ class Consumer(object):
     def __init__(self, ready_queue,
             init_callback=noop, send_events=False, hostname=None,
             initial_prefetch_count=2, pool=None, app=None,
-            timer=None, controller=None, **kwargs):
+            timer=None, controller=None, use_eventloop=False, **kwargs):
         self.app = app_or_default(app)
         self.connection = None
         self.task_consumer = None
@@ -318,6 +319,7 @@ class Consumer(object):
         self.heart = None
         self.pool = pool
         self.timer = timer or timer2.default_timer
+        self.use_eventloop = use_eventloop
         pidbox_state = AttributeDict(app=self.app,
                                      hostname=self.hostname,
                                      listener=self,     # pre 2.2
@@ -332,7 +334,7 @@ class Consumer(object):
         self._does_info = logger.isEnabledFor(logging.INFO)
         self.strategies = {}
         if self.use_eventloop:
-            self.hub = Hub(self.priority_timer)
+            self.hub = Hub(self.timer)
 
     def update_strategies(self):
         S = self.strategies
@@ -360,11 +362,10 @@ class Consumer(object):
 
     def consume_messages(self, sleep=sleep, min=min, Empty=Empty):
         """Consume messages forever (or until an exception is raised)."""
-        self.task_consumer.consume()
-        debug("Ready to accept tasks!")
 
         with self.hub as hub:
             qos = self.qos
+            concurrency = self.pool.num_processes
             update_qos = qos.update
             update_fds = hub.update
             fdmap = hub.fdmap
@@ -373,8 +374,34 @@ class Consumer(object):
             scheduled = hub.timer._queue
             transport = self.connection.transport
             on_poll_start = transport.on_poll_start
-
-            self.task_consumer.callbacks.append(fire_timers)
+            strategies = self.strategies
+            buffer = []
+
+            def flush_buffer():
+                for name, body, message in buffer:
+                    try:
+                        strategies[name](message, body, message.ack_log_error)
+                    except KeyError, exc:
+                        self.handle_unknown_task(body, message, exc)
+                    except InvalidTaskError, exc:
+                        self.handle_invalid_task(body, message, exc)
+                buffer[:] = []
+
+            def on_task_received(body, message):
+                try:
+                    name = body["task"]
+                except (KeyError, TypeError):
+                    return self.handle_unknown_message(body, message)
+                #try:
+                #    strategies[name](message, body, message.ack_log_error)
+                #except KeyError, exc:
+                #    self.handle_unknown_task(body, message, exc)
+                #except InvalidTaskError, exc:
+                #    self.handle_invalid_task(body, message, exc)
+                buffer.append((name, body, message))
+                if len(buffer) >= concurrency * 10:
+                    flush_buffer()
+                #fire_timers()
 
             update_fds(self.connection.eventmap, self.pool.eventmap)
             for handler, interval in self.pool.timers.iteritems():
@@ -389,6 +416,10 @@ class Consumer(object):
             self.pool.on_process_down = on_process_down
 
             transport.on_poll_init(hub.poller)
+            self.task_consumer.callbacks = [on_task_received]
+            self.task_consumer.consume()
+
+            debug("Ready to accept tasks!")
 
             while self._state != CLOSE and self.connection:
                 # shutdown if signal handlers told us to.
@@ -406,6 +437,7 @@ class Consumer(object):
 
                 update_fds(on_poll_start())
                 if fdmap:
+                    #for timeout in (time_to_sleep, 0.1, 0.1):
                     for fileno, event in poll(time_to_sleep) or ():
                         try:
                             fdmap[fileno](fileno, event)
@@ -414,8 +446,10 @@ class Consumer(object):
                         except socket.error:
                             if self._state != CLOSE:        # pragma: no cover
                                 raise
+                    if buffer:
+                        flush_buffer()
                 else:
-                    sleep(min(time_to_sleep, 1))
+                    sleep(min(time_to_sleep, 0.1))
 
     def on_task(self, task):
         """Handle received task.
@@ -424,7 +458,6 @@ class Consumer(object):
         otherwise we move it the ready queue for immediate processing.
 
         """
-
         if task.revoked():
             return
 
@@ -477,6 +510,18 @@ class Consumer(object):
                                      safe_repr(message.content_encoding),
                                      safe_repr(message.delivery_info))
 
+    def handle_unknown_message(self, body, message):
+        warn(UNKNOWN_FORMAT, self._message_report(body, message))
+        message.reject_log_error(logger, self.connection_errors)
+
+    def handle_unknown_task(self, body, message, exc):
+        error(UNKNOWN_TASK_ERROR, exc, safe_repr(body), exc_info=True)
+        message.reject_log_error(logger, self.connection_errors)
+
+    def handle_invalid_task(self, body, message, exc):
+        error(INVALID_TASK_ERROR, str(exc), safe_repr(body), exc_info=True)
+        message.reject_log_error(logger, self.connection_errors)
+
     def receive_message(self, body, message):
         """Handles incoming messages.
 
@@ -487,18 +532,14 @@ class Consumer(object):
         try:
             name = body["task"]
         except (KeyError, TypeError):
-            warn(UNKNOWN_FORMAT, self._message_report(body, message))
-            message.reject_log_error(logger, self.connection_errors)
-            return
+            return self.handle_unknown_message(body, message)
 
         try:
             self.strategies[name](message, body, message.ack_log_error)
         except KeyError, exc:
-            error(UNKNOWN_TASK_ERROR, exc, safe_repr(body), exc_info=True)
-            message.reject_log_error(logger, self.connection_errors)
+            self.handle_unknown_task(body, message, exc)
         except InvalidTaskError, exc:
-            error(INVALID_TASK_ERROR, str(exc), safe_repr(body), exc_info=True)
-            message.reject_log_error(logger, self.connection_errors)
+            self.handle_invalid_task(body, message, exc)
 
     def maybe_conn_error(self, fun):
         """Applies function but ignores any connection or channel
@@ -646,9 +687,6 @@ class Consumer(object):
         self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
         self.qos.update()
 
-        # receive_message handles incoming messages.
-        self.task_consumer.register_callback(self.receive_message)
-
         # Setup the process mailbox.
         self.reset_pidbox_node()
 
@@ -747,7 +785,10 @@ class Consumer(object):
 class BlockingConsumer(Consumer):
 
     def consume_messages(self):
+        # receive_message handles incoming messages.
+        self.task_consumer.register_callback(self.receive_message)
         self.task_consumer.consume()
+
         debug("Ready to accept tasks!")
 
         while self._state != CLOSE and self.connection: