|  | @@ -15,7 +15,8 @@ import pickle
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  def apply_async(task, args=None, kwargs=None, routing_key=None,
 | 
	
		
			
				|  |  | -        immediate=None, mandatory=None, connect_timeout=None, priority=None):
 | 
	
		
			
				|  |  | +        immediate=None, mandatory=None, connection=None,
 | 
	
		
			
				|  |  | +        connect_timeout=None, priority=None):
 | 
	
		
			
				|  |  |      """Run a task asynchronously by the celery daemon(s).
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      :param task: The task to run (a callable object, or a :class:`Task`
 | 
	
	
		
			
				|  | @@ -35,6 +36,9 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
 | 
	
		
			
				|  |  |      :keyword mandatory: Mandatory routing. Raises an exception if there's
 | 
	
		
			
				|  |  |          no running workers able to take on this task.
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +    :keyword connection: Re-use existing AMQP connection.
 | 
	
		
			
				|  |  | +        The ``connect_timeout`` argument is not respected if this is set.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      :keyword connect_timeout: The timeout in seconds, before we give up
 | 
	
		
			
				|  |  |          on establishing a connection to the AMQP server.
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -52,11 +56,16 @@ def apply_async(task, args=None, kwargs=None, routing_key=None,
 | 
	
		
			
				|  |  |      for option_name, option_value in message_opts.items():
 | 
	
		
			
				|  |  |          message_opts[option_name] = getattr(task, option_name, option_value)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    conn = DjangoAMQPConnection(connect_timeout=connect_timeout)
 | 
	
		
			
				|  |  | -    publisher = TaskPublisher(connection=conn)
 | 
	
		
			
				|  |  | +    need_to_close_connection = False
 | 
	
		
			
				|  |  | +    if not connection:
 | 
	
		
			
				|  |  | +        connection = DjangoAMQPConnection(connect_timeout=connect_timeout)
 | 
	
		
			
				|  |  | +        need_to_close_connection = True
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    publisher = TaskPublisher(connection=connection)
 | 
	
		
			
				|  |  |      task_id = publisher.delay_task(task.name, args, kwargs, **message_opts)
 | 
	
		
			
				|  |  |      publisher.close()
 | 
	
		
			
				|  |  | -    conn.close()
 | 
	
		
			
				|  |  | +    if need_to_close_connection:
 | 
	
		
			
				|  |  | +        connection.close()
 | 
	
		
			
				|  |  |      return AsyncResult(task_id)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 |