events.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. """Worker Event Dispatcher Bootstep.
  2. ``Events`` -> :class:`celery.events.EventDispatcher`.
  3. """
  4. from __future__ import absolute_import, unicode_literals
  5. from kombu.common import ignore_errors
  6. from celery import bootsteps
  7. from .connection import Connection
  8. __all__ = ('Events',)
  9. class Events(bootsteps.StartStopStep):
  10. """Service used for sending monitoring events."""
  11. requires = (Connection,)
  12. def __init__(self, c,
  13. task_events=True,
  14. without_heartbeat=False,
  15. without_gossip=False,
  16. **kwargs):
  17. self.groups = None if task_events else ['worker']
  18. self.send_events = (
  19. task_events or
  20. not without_gossip or
  21. not without_heartbeat
  22. )
  23. c.event_dispatcher = None
  24. super(Events, self).__init__(c, **kwargs)
  25. def start(self, c):
  26. # flush events sent while connection was down.
  27. prev = self._close(c)
  28. dis = c.event_dispatcher = c.app.events.Dispatcher(
  29. c.connection_for_write(),
  30. hostname=c.hostname,
  31. enabled=self.send_events,
  32. groups=self.groups,
  33. # we currently only buffer events when the event loop is enabled
  34. # XXX This excludes eventlet/gevent, which should actually buffer.
  35. buffer_group=['task'] if c.hub else None,
  36. on_send_buffered=c.on_send_event_buffered if c.hub else None,
  37. )
  38. if prev:
  39. dis.extend_buffer(prev)
  40. dis.flush()
  41. def stop(self, c):
  42. pass
  43. def _close(self, c):
  44. if c.event_dispatcher:
  45. dispatcher = c.event_dispatcher
  46. # remember changes from remote control commands:
  47. self.groups = dispatcher.groups
  48. # close custom connection
  49. if dispatcher.connection:
  50. ignore_errors(c, dispatcher.connection.close)
  51. ignore_errors(c, dispatcher.close)
  52. c.event_dispatcher = None
  53. return dispatcher
  54. def shutdown(self, c):
  55. self._close(c)