Browse Source

Events are now buffered up in the worker and sent as a list. Not using timers, so will only buffer up to $concurrency events

Ask Solem 11 years ago
parent
commit
03399b4d7c
3 changed files with 70 additions and 29 deletions
  1. 60 28
      celery/events/__init__.py
  2. 0 1
      celery/utils/functional.py
  3. 10 0
      celery/worker/consumer.py

+ 60 - 28
celery/events/__init__.py

@@ -14,7 +14,7 @@ import os
 import time
 import threading
 
-from collections import deque
+from collections import defaultdict, deque
 from contextlib import contextmanager
 from copy import copy
 from operator import itemgetter
@@ -25,6 +25,7 @@ from kombu.mixins import ConsumerMixin
 from kombu.utils import cached_property
 
 from celery.app import app_or_default
+from celery.five import items
 from celery.utils import anon_nodename, uuid
 from celery.utils.functional import dictfilter
 from celery.utils.timeutils import adjust_timestamp, utcoffset, maybe_s_to_ms
@@ -115,12 +116,17 @@ class EventDispatcher(object):
 
     def __init__(self, connection=None, hostname=None, enabled=True,
                  channel=None, buffer_while_offline=True, app=None,
-                 serializer=None, groups=None, delivery_mode=1):
+                 serializer=None, groups=None, delivery_mode=1,
+                 buffer_group=None, buffer_limit=24, on_send_buffered=None):
         self.app = app_or_default(app or self.app)
         self.connection = connection
         self.channel = channel
         self.hostname = hostname or anon_nodename()
         self.buffer_while_offline = buffer_while_offline
+        self.buffer_group = buffer_group or frozenset()
+        self.buffer_limit = buffer_limit
+        self.on_send_buffered = on_send_buffered
+        self._group_buffer = defaultdict(list)
         self.mutex = threading.Lock()
         self.producer = None
         self._outbound_buffer = deque()
@@ -164,9 +170,8 @@ class EventDispatcher(object):
             for callback in self.on_disabled:
                 callback()
 
-    def publish(self, type, fields, producer, retry=False,
-                retry_policy=None, blind=False, utcoffset=utcoffset,
-                Event=Event):
+    def publish(self, type, fields, producer,
+                blind=False, Event=Event, **kwargs):
         """Publish event using a custom :class:`~kombu.Producer`
         instance.
 
@@ -184,15 +189,20 @@ class EventDispatcher(object):
         :keyword utcoffset: Function returning the current utcoffset in hours.
 
         """
-
+        clock = None if blind else self.clock.forward()
+        event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
+                      pid=self.pid, clock=clock, **fields)
         with self.mutex:
-            clock = None if blind else self.clock.forward()
-            event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
-                          pid=self.pid, clock=clock, **fields)
-            exchange = self.exchange
+            return self._publish(event, producer,
+                                 routing_key=type.replace('-', '.'), **kwargs)
+
+    def _publish(self, event, producer, routing_key, retry=False,
+                 retry_policy=None, utcoffset=utcoffset):
+        exchange = self.exchange
+        try:
             producer.publish(
                 event,
-                routing_key=type.replace('-', '.'),
+                routing_key=routing_key,
                 exchange=exchange.name,
                 retry=retry,
                 retry_policy=retry_policy,
@@ -201,8 +211,12 @@ class EventDispatcher(object):
                 headers=self.headers,
                 delivery_mode=self.delivery_mode,
             )
+        except Exception as exc:
+            if not self.buffer_while_offline:
+                raise
+            self._outbound_buffer.append((event, routing_key, exc))
 
-    def send(self, type, blind=False, **fields):
+    def send(self, type, blind=False, utcoffset=utcoffset, **fields):
         """Send event.
 
         :param type: Event type name, with group separated by dash (`-`).
@@ -218,24 +232,38 @@ class EventDispatcher(object):
 
         """
         if self.enabled:
-            groups = self.groups
-            if groups and group_from(type) not in groups:
+            groups, group = self.groups, group_from(type)
+            if groups and group not in groups:
                 return
-            try:
-                self.publish(type, fields, self.producer, blind)
-            except Exception as exc:
-                if not self.buffer_while_offline:
-                    raise
-                self._outbound_buffer.append((type, fields, exc))
+            if group in self.buffer_group:
+                clock = self.clock.forward()
+                event = Event(type, hostname=self.hostname,
+                              utcoffset=utcoffset(),
+                              pid=self.pid, clock=clock, **fields)
+                buf = self._group_buffer[group]
+                buf.append(event)
+                if len(buf) >= self.buffer_limit:
+                    self.flush()
+                elif self.on_send_buffered:
+                    self.on_send_buffered()
+            else:
+                return self.publish(type, fields, self.producer, blind)
 
-    def flush(self):
+    def flush(self, errors=True, groups=True):
         """Flushes the outbound buffer."""
-        while self._outbound_buffer:
+        if errors:
+            buf = list(self._outbound_buffer)
             try:
-                type, fields, _ = self._outbound_buffer.popleft()
-            except IndexError:
-                return
-            self.send(type, **fields)
+                with self.mutex:
+                    for event, routing_key, _ in buf:
+                        self._publish(event, self.producer, routing_key)
+            finally:
+                self._outbound_buffer.clear()
+        if groups:
+            with self.mutex:
+                for group, events in items(self._group_buffer):
+                    self._publish(events, self.producer, '%s.multi' % group)
+                    events[:] = []  # list.clear
 
     def extend_buffer(self, other):
         """Copies the outbound buffer of another instance."""
@@ -357,8 +385,12 @@ class EventReceiver(ConsumerMixin):
         body['local_received'] = now()
         return type, body
 
-    def _receive(self, body, message):
-        self.process(*self.event_from_message(body))
+    def _receive(self, body, message, list=list, isinstance=isinstance):
+        if isinstance(body, list):  # 3.2: List of events
+            process, from_message = self.process, self.event_from_message
+            [process(*from_message(event)) for event in body]
+        else:
+            self.process(*self.event_from_message(body))
 
     @property
     def connection(self):

+ 0 - 1
celery/utils/functional.py

@@ -239,7 +239,6 @@ def chunks(it, n):
         [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9, 10]]
 
     """
-    # XXX This function is not used anymore, at least not by Celery itself.
     for first in it:
         yield [first] + list(islice(it, n - 1))
 

+ 10 - 0
celery/worker/consumer.py

@@ -371,6 +371,14 @@ class Consumer(object):
             conn.transport.register_with_event_loop(conn.connection, self.hub)
         return conn
 
+    def _flush_events(self):
+        if self.event_dispatcher:
+            self.event_dispatcher.flush()
+
+    def on_send_event_buffered(self):
+        if self.hub:
+            self.hub._ready.add(self._flush_events)
+
     def add_task_queue(self, queue, exchange=None, exchange_type=None,
                        routing_key=None, **options):
         cset = self.task_consumer
@@ -516,6 +524,8 @@ class Events(bootsteps.StartStopStep):
         dis = c.event_dispatcher = c.app.events.Dispatcher(
             c.connect(), hostname=c.hostname,
             enabled=self.send_events, groups=self.groups,
+            buffer_group=['task'] if c.hub else None,
+            on_send_buffered=c.on_send_event_buffered if c.hub else None,
         )
         if prev:
             dis.extend_buffer(prev)