|  | @@ -175,8 +175,10 @@ class Celery(object):
 | 
											
												
													
														|  |          self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
 |  |          self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      def send_task(self, name, args=None, kwargs=None, countdown=None,
 |  |      def send_task(self, name, args=None, kwargs=None, countdown=None,
 | 
											
												
													
														|  | -            eta=None, task_id=None, publisher=None, connection=None,
 |  | 
 | 
											
												
													
														|  | -            result_cls=None, expires=None, queues=None, **options):
 |  | 
 | 
											
												
													
														|  | 
 |  | +            eta=None, task_id=None, producer=None, connection=None,
 | 
											
												
													
														|  | 
 |  | +            result_cls=None, expires=None, queues=None, publisher=None,
 | 
											
												
													
														|  | 
 |  | +            **options):
 | 
											
												
													
														|  | 
 |  | +        producer = producer or publisher  # XXX compat
 | 
											
												
													
														|  |          if self.conf.CELERY_ALWAYS_EAGER:  # pragma: no cover
 |  |          if self.conf.CELERY_ALWAYS_EAGER:  # pragma: no cover
 | 
											
												
													
														|  |              warnings.warn(AlwaysEagerIgnored(
 |  |              warnings.warn(AlwaysEagerIgnored(
 | 
											
												
													
														|  |                  'CELERY_ALWAYS_EAGER has no effect on send_task'))
 |  |                  'CELERY_ALWAYS_EAGER has no effect on send_task'))
 | 
											
										
											
												
													
														|  | @@ -186,7 +188,7 @@ class Celery(object):
 | 
											
												
													
														|  |          options.setdefault('compression',
 |  |          options.setdefault('compression',
 | 
											
												
													
														|  |                             self.conf.CELERY_MESSAGE_COMPRESSION)
 |  |                             self.conf.CELERY_MESSAGE_COMPRESSION)
 | 
											
												
													
														|  |          options = router.route(options, name, args, kwargs)
 |  |          options = router.route(options, name, args, kwargs)
 | 
											
												
													
														|  | -        with self.default_producer(publisher) as producer:
 |  | 
 | 
											
												
													
														|  | 
 |  | +        with self.producer_or_acquire(producer) as producer:
 | 
											
												
													
														|  |              return result_cls(producer.publish_task(name, args, kwargs,
 |  |              return result_cls(producer.publish_task(name, args, kwargs,
 | 
											
												
													
														|  |                          task_id=task_id,
 |  |                          task_id=task_id,
 | 
											
												
													
														|  |                          countdown=countdown, eta=eta,
 |  |                          countdown=countdown, eta=eta,
 | 
											
										
											
												
													
														|  | @@ -211,7 +213,8 @@ class Celery(object):
 | 
											
												
													
														|  |      broker_connection = connection
 |  |      broker_connection = connection
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      @contextmanager
 |  |      @contextmanager
 | 
											
												
													
														|  | -    def default_connection(self, connection=None, pool=True, *args, **kwargs):
 |  | 
 | 
											
												
													
														|  | 
 |  | +    def connection_or_acquire(self, connection=None, pool=True,
 | 
											
												
													
														|  | 
 |  | +            *args, **kwargs):
 | 
											
												
													
														|  |          if connection:
 |  |          if connection:
 | 
											
												
													
														|  |              yield connection
 |  |              yield connection
 | 
											
												
													
														|  |          else:
 |  |          else:
 | 
											
										
											
												
													
														|  | @@ -221,14 +224,16 @@ class Celery(object):
 | 
											
												
													
														|  |              else:
 |  |              else:
 | 
											
												
													
														|  |                  with self.connection() as connection:
 |  |                  with self.connection() as connection:
 | 
											
												
													
														|  |                      yield connection
 |  |                      yield connection
 | 
											
												
													
														|  | 
 |  | +    default_connection = connection_or_acquire  # XXX compat
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      @contextmanager
 |  |      @contextmanager
 | 
											
												
													
														|  | -    def default_producer(self, producer=None):
 |  | 
 | 
											
												
													
														|  | 
 |  | +    def producer_or_acquire(self, producer=None):
 | 
											
												
													
														|  |          if producer:
 |  |          if producer:
 | 
											
												
													
														|  |              yield producer
 |  |              yield producer
 | 
											
												
													
														|  |          else:
 |  |          else:
 | 
											
												
													
														|  |              with self.amqp.producer_pool.acquire(block=True) as producer:
 |  |              with self.amqp.producer_pool.acquire(block=True) as producer:
 | 
											
												
													
														|  |                  yield producer
 |  |                  yield producer
 | 
											
												
													
														|  | 
 |  | +    default_producer = producer_or_acquire  # XXX compat
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |      def with_default_connection(self, fun):
 |  |      def with_default_connection(self, fun):
 | 
											
												
													
														|  |          """With any function accepting a `connection`
 |  |          """With any function accepting a `connection`
 | 
											
										
											
												
													
														|  | @@ -240,13 +245,13 @@ class Celery(object):
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          **Deprecated**
 |  |          **Deprecated**
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -        Use ``with app.default_connection(connection)`` instead.
 |  | 
 | 
											
												
													
														|  | 
 |  | +        Use ``with app.connection_or_acquire(connection)`` instead.
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  |          """
 |  |          """
 | 
											
												
													
														|  |          @wraps(fun)
 |  |          @wraps(fun)
 | 
											
												
													
														|  |          def _inner(*args, **kwargs):
 |  |          def _inner(*args, **kwargs):
 | 
											
												
													
														|  |              connection = kwargs.pop('connection', None)
 |  |              connection = kwargs.pop('connection', None)
 | 
											
												
													
														|  | -            with self.default_connection(connection) as c:
 |  | 
 | 
											
												
													
														|  | 
 |  | +            with self.connection_or_acquire(connection) as c:
 | 
											
												
													
														|  |                  return fun(*args, connection=c, **kwargs)
 |  |                  return fun(*args, connection=c, **kwargs)
 | 
											
												
													
														|  |          return _inner
 |  |          return _inner
 | 
											
												
													
														|  |  
 |  |  
 |