mingle.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from __future__ import absolute_import, unicode_literals
  2. from operator import itemgetter
  3. from celery import bootsteps
  4. from celery.five import items
  5. from celery.utils.log import get_logger
  6. from .events import Events
  7. __all__ = ['Mingle']
  8. MINGLE_GET_FIELDS = itemgetter('clock', 'revoked')
  9. logger = get_logger(__name__)
  10. debug, info, exception = logger.debug, logger.info, logger.exception
  11. class Mingle(bootsteps.StartStopStep):
  12. label = 'Mingle'
  13. requires = (Events,)
  14. compatible_transports = {'amqp', 'redis'}
  15. def __init__(self, c, without_mingle=False, **kwargs):
  16. self.enabled = not without_mingle and self.compatible_transport(c.app)
  17. def compatible_transport(self, app):
  18. with app.connection_for_read() as conn:
  19. return conn.transport.driver_type in self.compatible_transports
  20. def start(self, c):
  21. info('mingle: searching for neighbors')
  22. I = c.app.control.inspect(timeout=1.0, connection=c.connection)
  23. our_revoked = c.controller.state.revoked
  24. replies = I.hello(c.hostname, our_revoked._data) or {}
  25. replies.pop(c.hostname, None) # delete my own response
  26. if replies:
  27. info('mingle: sync with %s nodes',
  28. len([reply for reply, value in items(replies) if value]))
  29. [self.on_node_reply(c, nodename, reply)
  30. for nodename, reply in items(replies) if reply]
  31. info('mingle: sync complete')
  32. else:
  33. info('mingle: all alone')
  34. def on_node_reply(self, c, nodename, reply):
  35. debug('mingle: processing reply from %s', nodename)
  36. try:
  37. self.sync_with_node(c, **reply)
  38. except MemoryError:
  39. raise
  40. except Exception as exc:
  41. exception('mingle: sync with %s failed: %r', nodename, exc)
  42. def sync_with_node(self, c, clock=None, revoked=None, **kwargs):
  43. self.on_clock_event(c, clock)
  44. self.on_revoked_received(c, revoked)
  45. def on_clock_event(self, c, clock):
  46. c.app.clock.adjust(clock) if clock else c.app.clock.forward()
  47. def on_revoked_received(self, c, revoked):
  48. if revoked:
  49. c.controller.state.revoked.update(revoked)