events.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import time
  2. import socket
  3. import threading
  4. from celery.messaging import EventPublisher, EventConsumer
  5. def create_event(type, fields):
  6. std = {"type": type,
  7. "timestamp": fields.get("timestamp") or time.time()}
  8. return dict(fields, **std)
  9. def Event(type, **fields):
  10. """Create an event.
  11. An event is a dictionary, the only required field is the type.
  12. """
  13. return create_event(type, fields)
  14. class EventDispatcher(object):
  15. """Send events as messages.
  16. :param connection: Carrot connection.
  17. :keyword hostname: Hostname to identify ourselves as,
  18. by default uses the hostname returned by :func:`socket.gethostname`.
  19. :keyword enabled: Set to ``False`` to not actually publish any events,
  20. making :meth:`send` a noop operation.
  21. You need to :meth:`close` this after use.
  22. """
  23. def __init__(self, connection, hostname=None, enabled=True,
  24. publisher=None):
  25. self.connection = connection
  26. self.publisher = publisher or EventPublisher(self.connection)
  27. self.hostname = hostname or socket.gethostname()
  28. self.enabled = enabled
  29. self._lock = threading.Lock()
  30. def send(self, type, **fields):
  31. """Send event.
  32. :param type: Kind of event.
  33. :keyword \*\*fields: Event arguments.
  34. """
  35. if not self.enabled:
  36. return
  37. self._lock.acquire()
  38. try:
  39. self.publisher.send(Event(type, hostname=self.hostname, **fields))
  40. finally:
  41. self._lock.release()
  42. def close(self):
  43. """Close the event dispatcher."""
  44. self._lock.locked() and self._lock.release()
  45. self.publisher and self.publisher.close()
  46. class EventReceiver(object):
  47. """Capture events.
  48. :param connection: Carrot connection.
  49. :keyword handlers: Event handlers.
  50. :attr:`handlers`` is a dict of event types and their handlers,
  51. the special handler ``"*`"`` captures all events that doesn't have a
  52. handler.
  53. """
  54. handlers = {}
  55. def __init__(self, connection, handlers=None):
  56. self.connection = connection
  57. if handlers is not None:
  58. self.handlers = handlers
  59. def process(self, type, event):
  60. """Process the received event by dispatching it to the appropriate
  61. handler."""
  62. handler = self.handlers.get(type) or self.handlers.get("*")
  63. handler and handler(event)
  64. def capture(self, limit=None):
  65. """Open up a consumer capturing events.
  66. This has to run in the main process, and it will never
  67. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  68. """
  69. consumer = EventConsumer(self.connection)
  70. consumer.register_callback(self._receive)
  71. it = consumer.iterconsume(limit=limit)
  72. while True:
  73. it.next()
  74. def _receive(self, message_data, message):
  75. type = message_data.pop("type").lower()
  76. self.process(type, create_event(type, message_data))