|  | @@ -537,8 +537,42 @@ class Heart(bootsteps.StartStopStep):
 | 
	
		
			
				|  |  |      shutdown = stop
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -class Tasks(bootsteps.StartStopStep):
 | 
	
		
			
				|  |  | +class Mingle(bootsteps.StartStopStep):
 | 
	
		
			
				|  |  | +    label = 'Mingle'
 | 
	
		
			
				|  |  |      requires = (Events, )
 | 
	
		
			
				|  |  | +    compatible_transports = set(['amqp', 'redis'])
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def __init__(self, c, without_mingle=False, **kwargs):
 | 
	
		
			
				|  |  | +        self.enabled = not without_mingle and self.compatible_transport(c.app)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def compatible_transport(self, app):
 | 
	
		
			
				|  |  | +        with app.connection() as conn:
 | 
	
		
			
				|  |  | +            return conn.transport.driver_type in self.compatible_transports
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    def start(self, c):
 | 
	
		
			
				|  |  | +        info('mingle: searching for neighbors')
 | 
	
		
			
				|  |  | +        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
 | 
	
		
			
				|  |  | +        replies = I.hello(c.hostname, revoked._data) or {}
 | 
	
		
			
				|  |  | +        replies.pop(c.hostname, None)
 | 
	
		
			
				|  |  | +        if replies:
 | 
	
		
			
				|  |  | +            info('mingle: sync with %s nodes',
 | 
	
		
			
				|  |  | +                 len([reply for reply, value in items(replies) if value]))
 | 
	
		
			
				|  |  | +            for reply in values(replies):
 | 
	
		
			
				|  |  | +                if reply:
 | 
	
		
			
				|  |  | +                    try:
 | 
	
		
			
				|  |  | +                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
 | 
	
		
			
				|  |  | +                    except KeyError:  # reply from pre-3.1 worker
 | 
	
		
			
				|  |  | +                        pass
 | 
	
		
			
				|  |  | +                    else:
 | 
	
		
			
				|  |  | +                        c.app.clock.adjust(other_clock)
 | 
	
		
			
				|  |  | +                        revoked.update(other_revoked)
 | 
	
		
			
				|  |  | +            info('mingle: sync complete')
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +            info('mingle: all alone')
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class Tasks(bootsteps.StartStopStep):
 | 
	
		
			
				|  |  | +    requires = (Mingle, )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __init__(self, c, **kwargs):
 | 
	
		
			
				|  |  |          c.task_consumer = c.qos = None
 | 
	
	
		
			
				|  | @@ -579,42 +613,8 @@ class Agent(bootsteps.StartStopStep):
 | 
	
		
			
				|  |  |          return agent
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -class Mingle(bootsteps.StartStopStep):
 | 
	
		
			
				|  |  | -    label = 'Mingle'
 | 
	
		
			
				|  |  | -    requires = (Events, )
 | 
	
		
			
				|  |  | -    compatible_transports = set(['amqp', 'redis'])
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def __init__(self, c, without_mingle=False, **kwargs):
 | 
	
		
			
				|  |  | -        self.enabled = not without_mingle and self.compatible_transport(c.app)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def compatible_transport(self, app):
 | 
	
		
			
				|  |  | -        with app.connection() as conn:
 | 
	
		
			
				|  |  | -            return conn.transport.driver_type in self.compatible_transports
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    def start(self, c):
 | 
	
		
			
				|  |  | -        info('mingle: searching for neighbors')
 | 
	
		
			
				|  |  | -        I = c.app.control.inspect(timeout=1.0, connection=c.connection)
 | 
	
		
			
				|  |  | -        replies = I.hello(c.hostname, revoked._data) or {}
 | 
	
		
			
				|  |  | -        replies.pop(c.hostname, None)
 | 
	
		
			
				|  |  | -        if replies:
 | 
	
		
			
				|  |  | -            info('mingle: sync with %s nodes',
 | 
	
		
			
				|  |  | -                 len([reply for reply, value in items(replies) if value]))
 | 
	
		
			
				|  |  | -            for reply in values(replies):
 | 
	
		
			
				|  |  | -                if reply:
 | 
	
		
			
				|  |  | -                    try:
 | 
	
		
			
				|  |  | -                        other_clock, other_revoked = MINGLE_GET_FIELDS(reply)
 | 
	
		
			
				|  |  | -                    except KeyError:  # reply from pre-3.1 worker
 | 
	
		
			
				|  |  | -                        pass
 | 
	
		
			
				|  |  | -                    else:
 | 
	
		
			
				|  |  | -                        c.app.clock.adjust(other_clock)
 | 
	
		
			
				|  |  | -                        revoked.update(other_revoked)
 | 
	
		
			
				|  |  | -            info('mingle: sync complete')
 | 
	
		
			
				|  |  | -        else:
 | 
	
		
			
				|  |  | -            info('mingle: all alone')
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  |  class Control(bootsteps.StartStopStep):
 | 
	
		
			
				|  |  | -    requires = (Mingle, )
 | 
	
		
			
				|  |  | +    requires = (Tasks, )
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def __init__(self, c, **kwargs):
 | 
	
		
			
				|  |  |          self.is_green = c.pool is not None and c.pool.is_green
 |