events.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. """Implementation for the app.events shortcuts."""
  2. from __future__ import absolute_import, unicode_literals
  3. from contextlib import contextmanager
  4. from kombu.utils.objects import cached_property
  5. class Events(object):
  6. """Implements app.events."""
  7. receiver_cls = 'celery.events.receiver:EventReceiver'
  8. dispatcher_cls = 'celery.events.dispatcher:EventDispatcher'
  9. state_cls = 'celery.events.state:State'
  10. def __init__(self, app=None):
  11. self.app = app
  12. @cached_property
  13. def Receiver(self):
  14. return self.app.subclass_with_self(
  15. self.receiver_cls, reverse='events.Receiver')
  16. @cached_property
  17. def Dispatcher(self):
  18. return self.app.subclass_with_self(
  19. self.dispatcher_cls, reverse='events.Dispatcher')
  20. @cached_property
  21. def State(self):
  22. return self.app.subclass_with_self(
  23. self.state_cls, reverse='events.State')
  24. @contextmanager
  25. def default_dispatcher(self, hostname=None, enabled=True,
  26. buffer_while_offline=False):
  27. with self.app.amqp.producer_pool.acquire(block=True) as prod:
  28. # pylint: disable=too-many-function-args
  29. # This is a property pylint...
  30. with self.Dispatcher(prod.connection, hostname, enabled,
  31. prod.channel, buffer_while_offline) as d:
  32. yield d