Przeglądaj źródła

Renames 'with app.default_connection' -> 'with app.connection_or_acquire'

Ask Solem 12 lat temu
rodzic
commit
6e53b21fa9

+ 12 - 7
celery/app/base.py

@@ -175,8 +175,10 @@ class Celery(object):
         self.conf.update(self.loader.cmdline_config_parser(argv, namespace))
 
     def send_task(self, name, args=None, kwargs=None, countdown=None,
-            eta=None, task_id=None, publisher=None, connection=None,
-            result_cls=None, expires=None, queues=None, **options):
+            eta=None, task_id=None, producer=None, connection=None,
+            result_cls=None, expires=None, queues=None, publisher=None,
+            **options):
+        producer = producer or publisher  # XXX compat
         if self.conf.CELERY_ALWAYS_EAGER:  # pragma: no cover
             warnings.warn(AlwaysEagerIgnored(
                 'CELERY_ALWAYS_EAGER has no effect on send_task'))
@@ -186,7 +188,7 @@ class Celery(object):
         options.setdefault('compression',
                            self.conf.CELERY_MESSAGE_COMPRESSION)
         options = router.route(options, name, args, kwargs)
-        with self.default_producer(publisher) as producer:
+        with self.producer_or_acquire(producer) as producer:
             return result_cls(producer.publish_task(name, args, kwargs,
                         task_id=task_id,
                         countdown=countdown, eta=eta,
@@ -213,7 +215,8 @@ class Celery(object):
     broker_connection = connection
 
     @contextmanager
-    def default_connection(self, connection=None, pool=True, *args, **kwargs):
+    def connection_or_acquire(self, connection=None, pool=True,
+            *args, **kwargs):
         if connection:
             yield connection
         else:
@@ -223,14 +226,16 @@ class Celery(object):
             else:
                 with self.connection() as connection:
                     yield connection
+    default_connection = connection_or_acquire  # XXX compat
 
     @contextmanager
-    def default_producer(self, producer=None):
+    def producer_or_acquire(self, producer=None):
         if producer:
             yield producer
         else:
             with self.amqp.producer_pool.acquire(block=True) as producer:
                 yield producer
+    default_producer = producer_or_acquire  # XXX compat
 
     def with_default_connection(self, fun):
         """With any function accepting a `connection`
@@ -242,13 +247,13 @@ class Celery(object):
 
         **Deprecated**
 
-        Use ``with app.default_connection(connection)`` instead.
+        Use ``with app.connection_or_acquire(connection)`` instead.
 
         """
         @wraps(fun)
         def _inner(*args, **kwargs):
             connection = kwargs.pop('connection', None)
-            with self.default_connection(connection) as c:
+            with self.connection_or_acquire(connection) as c:
                 return fun(*args, **dict(kwargs, connection=c))
         return _inner
 

+ 1 - 1
celery/app/builtins.py

@@ -134,7 +134,7 @@ def add_group_task(app):
             if self.request.is_eager or app.conf.CELERY_ALWAYS_EAGER:
                 return app.GroupResult(result.id,
                         [task.apply(group_id=group_id) for task in taskit])
-            with app.default_producer() as pub:
+            with app.producer_or_acquire() as pub:
                 [task.apply_async(group_id=group_id, publisher=pub,
                                   add_to_parent=False) for task in taskit]
             parent = get_current_worker_task()

+ 2 - 2
celery/app/control.py

@@ -102,7 +102,7 @@ class Control(object):
         :returns: the number of tasks discarded.
 
         """
-        with self.app.default_connection(connection) as conn:
+        with self.app.connection_or_acquire(connection) as conn:
             return self.app.amqp.TaskConsumer(conn).purge()
     discard_all = purge
 
@@ -252,7 +252,7 @@ class Control(object):
             received.
 
         """
-        with self.app.default_connection(connection) as conn:
+        with self.app.connection_or_acquire(connection) as conn:
             arguments = dict(arguments or {}, **extra_kwargs)
             return self.mailbox(conn)._broadcast(command, arguments,
                                                  destination, reply, timeout,

+ 1 - 1
celery/app/task.py

@@ -455,7 +455,7 @@ class Task(object):
 
         if connection:
             producer = app.amqp.TaskProducer(connection)
-        with app.default_producer(producer) as P:
+        with app.producer_or_acquire(producer) as P:
             evd = None
             if conf.CELERY_SEND_TASK_SENT_EVENT:
                 evd = app.events.Dispatcher(channel=P.channel,

+ 1 - 1
celery/contrib/migrate.py

@@ -177,7 +177,7 @@ def move(predicate, connection=None, exchange=None, routing_key=None,
     """
     app = app_or_default(app)
     queues = [_maybe_queue(app, queue) for queue in source or []] or None
-    with app.default_connection(connection, pool=False) as conn:
+    with app.connection_or_acquire(connection, pool=False) as conn:
         producer = app.amqp.TaskProducer(conn)
         state = State()
 

+ 1 - 1
celery/result.py

@@ -394,7 +394,7 @@ class ResultSet(ResultBase):
 
     def revoke(self, connection=None):
         """Revoke all tasks in the set."""
-        with self.app.default_connection(connection) as conn:
+        with self.app.connection_or_acquire(connection) as conn:
             for result in self.results:
                 result.revoke(connection=conn)
 

+ 1 - 1
celery/task/sets.py

@@ -46,7 +46,7 @@ class TaskSet(list):
         if app.conf.CELERY_ALWAYS_EAGER:
             return self.apply(taskset_id=taskset_id)
 
-        with app.default_connection(connection, connect_timeout) as conn:
+        with app.connection_or_acquire(connection, connect_timeout) as conn:
             setid = taskset_id or uuid()
             pub = publisher or self.Publisher(conn)
             results = self._async_results(setid, pub)

+ 8 - 1
docs/reference/celery.rst

@@ -208,7 +208,7 @@ Application
 
         :returns :class:`kombu.connection.Connection`:
 
-    .. method:: Celery.default_connection(connection=None)
+    .. method:: Celery.connection_or_acquire(connection=None)
 
         For use within a with-statement to get a connection from the pool
         if one is not already provided.
@@ -216,6 +216,13 @@ Application
         :keyword connection: If not provided, then a connection will be
                              acquired from the connection pool.
 
+    .. method:: Celery.producer_or_acquire(producer=None)
+
+        For use within a with-statement to get a producer from the pool
+        if one is not already provided
+
+        :keyword producer: If not provided, then a producer will be
+                           acquired from the producer pool.
 
     .. method:: Celery.mail_admins(subject, body, fail_silently=False)