__init__.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import time
  2. import socket
  3. import threading
  4. from celery.messaging import EventPublisher, EventConsumer
  5. def create_event(type, fields):
  6. std = {"type": type,
  7. "timestamp": fields.get("timestamp") or time.time()}
  8. return dict(fields, **std)
  9. def Event(type, **fields):
  10. """Create an event.
  11. An event is a dictionary, the only required field is the type.
  12. """
  13. return create_event(type, fields)
  14. class EventDispatcher(object):
  15. """Send events as messages.
  16. :param connection: Carrot connection.
  17. :keyword hostname: Hostname to identify ourselves as,
  18. by default uses the hostname returned by :func:`socket.gethostname`.
  19. :keyword enabled: Set to ``False`` to not actually publish any events,
  20. making :meth:`send` a noop operation.
  21. You need to :meth:`close` this after use.
  22. """
  23. def __init__(self, connection, hostname=None, enabled=True):
  24. self.connection = connection
  25. self.hostname = hostname or socket.gethostname()
  26. self.enabled = enabled
  27. self._lock = threading.Lock()
  28. self.publisher = None
  29. if self.enabled:
  30. self.enable()
  31. def enable(self):
  32. self.enabled = True
  33. self.publisher = EventPublisher(self.connection)
  34. def disable(self):
  35. self.enabled = False
  36. if self.publisher is not None:
  37. self.publisher.close()
  38. self.publisher = None
  39. def send(self, type, **fields):
  40. """Send event.
  41. :param type: Kind of event.
  42. :keyword \*\*fields: Event arguments.
  43. """
  44. if not self.enabled:
  45. return
  46. self._lock.acquire()
  47. try:
  48. self.publisher.send(Event(type, hostname=self.hostname, **fields))
  49. finally:
  50. self._lock.release()
  51. def close(self):
  52. """Close the event dispatcher."""
  53. self._lock.locked() and self._lock.release()
  54. self.publisher and self.publisher.close()
  55. class EventReceiver(object):
  56. """Capture events.
  57. :param connection: Carrot connection.
  58. :keyword handlers: Event handlers.
  59. :attr:`handlers`` is a dict of event types and their handlers,
  60. the special handler ``"*`"`` captures all events that doesn't have a
  61. handler.
  62. """
  63. handlers = {}
  64. def __init__(self, connection, handlers=None):
  65. self.connection = connection
  66. if handlers is not None:
  67. self.handlers = handlers
  68. def process(self, type, event):
  69. """Process the received event by dispatching it to the appropriate
  70. handler."""
  71. handler = self.handlers.get(type) or self.handlers.get("*")
  72. handler and handler(event)
  73. def consumer(self):
  74. consumer = EventConsumer(self.connection)
  75. consumer.register_callback(self._receive)
  76. return consumer
  77. def capture(self, limit=None):
  78. """Open up a consumer capturing events.
  79. This has to run in the main process, and it will never
  80. stop unless forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
  81. """
  82. consumer = self.consumer()
  83. it = consumer.iterconsume(limit=limit)
  84. while True:
  85. it.next()
  86. def _receive(self, message_data, message):
  87. type = message_data.pop("type").lower()
  88. self.process(type, create_event(type, message_data))