|  | @@ -12,7 +12,7 @@ from datetime import timedelta
 | 
	
		
			
				|  |  |  from weakref import WeakValueDictionary
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  from kombu import Connection, Consumer, Exchange, Producer, Queue
 | 
	
		
			
				|  |  | -from kombu.common import entry_to_queue
 | 
	
		
			
				|  |  | +from kombu.common import Broadcast
 | 
	
		
			
				|  |  |  from kombu.pools import ProducerPool
 | 
	
		
			
				|  |  |  from kombu.utils import cached_property, uuid
 | 
	
		
			
				|  |  |  from kombu.utils.encoding import safe_repr
 | 
	
	
		
			
				|  | @@ -102,7 +102,7 @@ class Queues(dict):
 | 
	
		
			
				|  |  |              options['routing_key'] = name
 | 
	
		
			
				|  |  |          if self.ha_policy is not None:
 | 
	
		
			
				|  |  |              self._set_ha_policy(options.setdefault('queue_arguments', {}))
 | 
	
		
			
				|  |  | -        q = self[name] = entry_to_queue(name, **options)
 | 
	
		
			
				|  |  | +        q = self[name] = Queue.from_dict(name, **options)
 | 
	
		
			
				|  |  |          return q
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      def _set_ha_policy(self, args):
 | 
	
	
		
			
				|  | @@ -202,7 +202,8 @@ class TaskProducer(Producer):
 | 
	
		
			
				|  |  |                  qname = queue.name
 | 
	
		
			
				|  |  |              exchange = exchange or queue.exchange.name
 | 
	
		
			
				|  |  |              routing_key = routing_key or queue.routing_key
 | 
	
		
			
				|  |  | -        declare = declare or ([queue] if queue else [])
 | 
	
		
			
				|  |  | +        if declare is None and queue and not isinstance(queue, Broadcast):
 | 
	
		
			
				|  |  | +            declare = [queue]
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          # merge default and custom policy
 | 
	
		
			
				|  |  |          retry = self.retry if retry is None else retry
 |