mingle.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. from __future__ import absolute_import, unicode_literals
  2. from operator import itemgetter
  3. from celery import bootsteps
  4. from celery.five import items, values
  5. from celery.utils.log import get_logger
  6. from celery.worker.state import revoked
  7. from .events import Events
  8. __all__ = ['Mingle']
  9. MINGLE_GET_FIELDS = itemgetter('clock', 'revoked')
  10. logger = get_logger(__name__)
  11. info = logger.info
  12. class Mingle(bootsteps.StartStopStep):
  13. label = 'Mingle'
  14. requires = (Events,)
  15. compatible_transports = {'amqp', 'redis'}
  16. def __init__(self, c, without_mingle=False, **kwargs):
  17. self.enabled = not without_mingle and self.compatible_transport(c.app)
  18. def compatible_transport(self, app):
  19. with app.connection_for_read() as conn:
  20. return conn.transport.driver_type in self.compatible_transports
  21. def start(self, c):
  22. info('mingle: searching for neighbors')
  23. I = c.app.control.inspect(timeout=1.0, connection=c.connection)
  24. replies = I.hello(c.hostname, revoked._data) or {}
  25. replies.pop(c.hostname, None)
  26. if replies:
  27. info('mingle: sync with %s nodes',
  28. len([reply for reply, value in items(replies) if value]))
  29. for reply in values(replies):
  30. if reply:
  31. try:
  32. other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
  33. except KeyError: # reply from pre-3.1 worker
  34. pass
  35. else:
  36. c.app.clock.adjust(other_clock)
  37. revoked.update(other_revoked)
  38. info('mingle: sync complete')
  39. else:
  40. info('mingle: all alone')