浏览代码

rename EventDispatcher.copy_buffer -> extend_buffer

Ask Solem 12 年之前
父节点
当前提交
1f35d6f0fb
共有 2 个文件被更改,包括 9 次插入9 次删除
  1. 8 8
      celery/events/__init__.py
  2. 1 1
      celery/worker/consumer.py

+ 8 - 8
celery/events/__init__.py

@@ -41,16 +41,16 @@ def get_exchange(conn):
     return ex
 
 
-def Event(type, _fields=None, **fields):
+def Event(type, _fields=None, __dict__=dict, __now__=time.time, **fields):
     """Create an event.
 
     An event is a dictionary, the only required field is ``type``.
     A ``timestamp`` field will be set to the current time if not provided.
 
     """
-    event = dict(_fields or {}, type=type, **fields)
+    event = __dict__(_fields or {}, type=type, **fields)
     if 'timestamp' not in event:
-        event['timestamp'] = time.time()
+        event['timestamp'] = __now__()
     return event
 
 
@@ -225,9 +225,9 @@ class EventDispatcher(object):
                 return
             self.send(type, **fields)
 
-    def copy_buffer(self, other):
+    def extend_buffer(self, other):
         """Copies the outbound buffer of another instance."""
-        self._outbound_buffer = other._outbound_buffer
+        self._outbound_buffer.extend(other._outbound_buffer)
 
     def close(self):
         """Close the event dispatcher."""
@@ -347,7 +347,7 @@ class Events(object):
     @contextmanager
     def default_dispatcher(self, hostname=None, enabled=True,
                            buffer_while_offline=False):
-        with self.app.amqp.producer_pool.acquire(block=True) as pub:
-            with self.Dispatcher(pub.connection, hostname, enabled,
-                                 pub.channel, buffer_while_offline) as d:
+        with self.app.amqp.producer_pool.acquire(block=True) as prod:
+            with self.Dispatcher(prod.connection, hostname, enabled,
+                                 prod.channel, buffer_while_offline) as d:
                 yield d

+ 1 - 1
celery/worker/consumer.py

@@ -448,7 +448,7 @@ class Events(bootsteps.StartStopStep):
             enabled=self.send_events, groups=self.groups,
         )
         if prev:
-            dis.copy_buffer(prev)
+            dis.extend_buffer(prev)
             dis.flush()
 
     def stop(self, c):