events.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. """Implementation for the app.events shortcuts."""
  2. from contextlib import contextmanager
  3. from celery.events import EventDispatcher, EventReceiver
  4. from celery.events.state import State
  5. from celery.types import AppT
  6. from kombu.utils.objects import cached_property
  7. class Events:
  8. """Implements app.events."""
  9. receiver_cls = 'celery.events.receiver:EventReceiver'
  10. dispatcher_cls = 'celery.events.dispatcher:EventDispatcher'
  11. state_cls = 'celery.events.state:State'
  12. def __init__(self, app: AppT = None):
  13. self.app = app
  14. @cached_property
  15. def Receiver(self) -> EventReceiver:
  16. return self.app.subclass_with_self(
  17. self.receiver_cls, reverse='events.Receiver')
  18. @cached_property
  19. def Dispatcher(self) -> EventDispatcher:
  20. return self.app.subclass_with_self(
  21. self.dispatcher_cls, reverse='events.Dispatcher')
  22. @cached_property
  23. def State(self) -> State:
  24. return self.app.subclass_with_self(
  25. self.state_cls, reverse='events.State')
  26. @contextmanager
  27. def default_dispatcher(
  28. self,
  29. hostname: str = None,
  30. enabled: bool = True,
  31. buffer_while_offline: bool = False) -> EventDispatcher:
  32. with self.app.amqp.producer_pool.acquire(block=True) as prod:
  33. # pylint: disable=too-many-function-args
  34. # This is a property pylint...
  35. with self.Dispatcher(prod.connection, hostname, enabled,
  36. prod.channel, buffer_while_offline) as d:
  37. yield d