|  | @@ -315,6 +315,9 @@ class AMQP(object):
 | 
	
		
			
				|  |  |      #: compat alias to Connection
 | 
	
		
			
				|  |  |      BrokerConnection = Connection
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    producer_cls = TaskProducer
 | 
	
		
			
				|  |  | +    consumer_cls = TaskConsumer
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      #: Cached and prepared routing table.
 | 
	
		
			
				|  |  |      _rtable = None
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -352,7 +355,7 @@ class AMQP(object):
 | 
	
		
			
				|  |  |      def TaskConsumer(self):
 | 
	
		
			
				|  |  |          """Return consumer configured to consume from the queues
 | 
	
		
			
				|  |  |          we are configured for (``app.amqp.queues.consume_from``)."""
 | 
	
		
			
				|  |  | -        return self.app.subclass_with_self(TaskConsumer,
 | 
	
		
			
				|  |  | +        return self.app.subclass_with_self(self.consumer_cls,
 | 
	
		
			
				|  |  |                                             reverse='amqp.TaskConsumer')
 | 
	
		
			
				|  |  |      get_task_consumer = TaskConsumer  # XXX compat
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -365,7 +368,7 @@ class AMQP(object):
 | 
	
		
			
				|  |  |          """
 | 
	
		
			
				|  |  |          conf = self.app.conf
 | 
	
		
			
				|  |  |          return self.app.subclass_with_self(
 | 
	
		
			
				|  |  | -            TaskProducer,
 | 
	
		
			
				|  |  | +            self.producer_cls,
 | 
	
		
			
				|  |  |              reverse='amqp.TaskProducer',
 | 
	
		
			
				|  |  |              exchange=self.default_exchange,
 | 
	
		
			
				|  |  |              routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
 |