|  | @@ -1,7 +1,6 @@
 | 
	
		
			
				|  |  |  import time
 | 
	
		
			
				|  |  |  import socket
 | 
	
		
			
				|  |  |  import threading
 | 
	
		
			
				|  |  | -from UserDict import UserDict
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from celery.messaging import EventPublisher, EventConsumer
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -11,11 +10,17 @@ def Event(type, **fields):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class EventDispatcher(object):
 | 
	
		
			
				|  |  | -    """
 | 
	
		
			
				|  |  | +    """Send events as messages.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    :param connection: Carrot connection.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    :keyword hostname: Hostname to identify ourselves as,
 | 
	
		
			
				|  |  | +        by default uses the hostname returned by :func:`socket.gethostname`.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    dispatcher.send("event-name", arg1=1, arg2=2, arg3=3)
 | 
	
		
			
				|  |  | +    You need to :meth:`close` this after use.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      def __init__(self, connection, hostname=None):
 | 
	
		
			
				|  |  |          self.connection = connection
 | 
	
		
			
				|  |  |          self.publisher = EventPublisher(self.connection)
 | 
	
	
		
			
				|  | @@ -23,6 +28,12 @@ class EventDispatcher(object):
 | 
	
		
			
				|  |  |          self._lock = threading.Lock()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def send(self, type, **fields):
 | 
	
		
			
				|  |  | +        """Send event.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        :param type: Kind of event.
 | 
	
		
			
				|  |  | +        :keyword \*\*fields: Event arguments.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        """
 | 
	
		
			
				|  |  |          self._lock.acquire()
 | 
	
		
			
				|  |  |          try:
 | 
	
		
			
				|  |  |              fields["timestamp"] = time.time()
 | 
	
	
		
			
				|  | @@ -32,11 +43,22 @@ class EventDispatcher(object):
 | 
	
		
			
				|  |  |              self._lock.release()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def close(self):
 | 
	
		
			
				|  |  | +        """Close the event dispatcher."""
 | 
	
		
			
				|  |  |          self._lock.locked() and self._lock.release()
 | 
	
		
			
				|  |  |          self.publisher and self.publisher.close()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  class EventReceiver(object):
 | 
	
		
			
				|  |  | +    """Capture events.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    :param connection: Carrot connection.
 | 
	
		
			
				|  |  | +    :keyword handlers: Event handlers.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    :attr:`handlers`` is a dict of event types and their handlers,
 | 
	
		
			
				|  |  | +    the special handler ``"*`"`` captures all events that doesn't have a
 | 
	
		
			
				|  |  | +    handler.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  |      handlers = {}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __init__(self, connection, handlers=None):
 | 
	
	
		
			
				|  | @@ -45,17 +67,21 @@ class EventReceiver(object):
 | 
	
		
			
				|  |  |              self.handlers = handlers
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def process(self, type, event):
 | 
	
		
			
				|  |  | +        """Process the received event by dispatching it to the appropriate
 | 
	
		
			
				|  |  | +        handler."""
 | 
	
		
			
				|  |  |          print("Received event: %s" % event)
 | 
	
		
			
				|  |  |          handler = self.handlers.get(type) or self.handlers.get("*")
 | 
	
		
			
				|  |  |          handler and handler(event)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    def _receive(self, message_data, message):
 | 
	
		
			
				|  |  | -        type = message_data.pop("type").lower()
 | 
	
		
			
				|  |  | -        self.process(type, Event(type, **message_data))
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def consume(self, limit=None):
 | 
	
		
			
				|  |  | +    def capture(self, limit=None):
 | 
	
		
			
				|  |  | +        """Open up a consumer capturing events. This has to be running
 | 
	
		
			
				|  |  | +        in the main process, and it will never stop unless forced"""
 | 
	
		
			
				|  |  |          consumer = EventConsumer(self.connection)
 | 
	
		
			
				|  |  |          consumer.register_callback(self._receive)
 | 
	
		
			
				|  |  |          it = consumer.iterconsume(limit=limit)
 | 
	
		
			
				|  |  |          while True:
 | 
	
		
			
				|  |  |              it.next()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def _receive(self, message_data, message):
 | 
	
		
			
				|  |  | +        type = message_data.pop("type").lower()
 | 
	
		
			
				|  |  | +        self.process(type, Event(type, **message_data))
 |