123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- import time
- import socket
- import threading
- from celery.messaging import EventPublisher, EventConsumer
- def create_event(type, fields):
- std = {"type": type,
- "timestamp": fields.get("timestamp") or time.time()}
- return dict(fields, **std)
- def Event(type, **fields):
- """Create an event.
- An event is a dictionary, the only required field is the type.
- """
- return create_event(type, fields)
- class EventDispatcher(object):
- """Send events as messages.
- :param connection: Carrot connection.
- :keyword hostname: Hostname to identify ourselves as,
- by default uses the hostname returned by :func:`socket.gethostname`.
- :keyword enabled: Set to ``False`` to not actually publish any events,
- making :meth:`send` a noop operation.
- You need to :meth:`close` this after use.
- """
- def __init__(self, connection, hostname=None, enabled=True,
- publisher=None):
- self.connection = connection
- self.publisher = publisher or EventPublisher(self.connection)
- self.hostname = hostname or socket.gethostname()
- self.enabled = enabled
- self._lock = threading.Lock()
- def send(self, type, **fields):
- """Send event.
- :param type: Kind of event.
- :keyword \*\*fields: Event arguments.
- """
- if not self.enabled:
- return
- self._lock.acquire()
- try:
- self.publisher.send(Event(type, hostname=self.hostname, **fields))
- finally:
- self._lock.release()
- def close(self):
- """Close the event dispatcher."""
- self._lock.locked() and self._lock.release()
- self.publisher and self.publisher.close()
- class EventReceiver(object):
- """Capture events.
- :param connection: Carrot connection.
- :keyword handlers: Event handlers.
- :attr:`handlers`` is a dict of event types and their handlers,
- the special handler ``"*`"`` captures all events that doesn't have a
- handler.
- """
- handlers = {}
- def __init__(self, connection, handlers=None):
- self.connection = connection
- if handlers is not None:
- self.handlers = handlers
- def process(self, type, event):
- """Process the received event by dispatching it to the appropriate
- handler."""
- handler = self.handlers.get(type) or self.handlers.get("*")
- handler and handler(event)
- def capture(self, limit=None):
- """Open up a consumer capturing events.
- This has to run in the main process, and it will never
- stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
- """
- consumer = EventConsumer(self.connection)
- consumer.register_callback(self._receive)
- it = consumer.iterconsume(limit=limit)
- while True:
- it.next()
- def _receive(self, message_data, message):
- type = message_data.pop("type").lower()
- self.process(type, create_event(type, message_data))
|