Parcourir la source

Renames app.broker_connection -> app.connection

Ask Solem il y a 12 ans
Parent
commit
9372cc73c7

+ 1 - 1
celery/__compat__.py

@@ -72,7 +72,7 @@ COMPAT_MODULES = {
         'messaging': {
             'TaskPublisher': 'amqp.TaskPublisher',
             'TaskConsumer': 'amqp.TaskConsumer',
-            'establish_connection': 'broker_connection',
+            'establish_connection': 'connection',
             'with_connection': 'with_default_connection',
             'get_consumer_set': 'amqp.TaskConsumer',
         },

+ 5 - 2
celery/app/amqp.py

@@ -11,7 +11,7 @@ from __future__ import absolute_import
 from datetime import timedelta
 from weakref import WeakValueDictionary
 
-from kombu import BrokerConnection, Consumer, Exchange, Producer, Queue
+from kombu import Connection, Consumer, Exchange, Producer, Queue
 from kombu.common import entry_to_queue
 from kombu.pools import ProducerPool
 
@@ -240,9 +240,12 @@ class TaskConsumer(Consumer):
 
 
 class AMQP(object):
-    BrokerConnection = BrokerConnection
+    Connection = Connection
     Consumer = Consumer
 
+    #: compat alias to Connection
+    BrokerConnection = Connection
+
     #: Cached and prepared routing table.
     _rtable = None
 

+ 5 - 4
celery/app/base.py

@@ -188,12 +188,12 @@ class Celery(object):
                                                   countdown=countdown, eta=eta,
                                                   expires=expires, **options))
 
-    def broker_connection(self, hostname=None, userid=None,
+    def connection(self, hostname=None, userid=None,
             password=None, virtual_host=None, port=None, ssl=None,
             insist=None, connect_timeout=None, transport=None,
             transport_options=None, **kwargs):
         conf = self.conf
-        return self.amqp.BrokerConnection(
+        return self.amqp.Connection(
                     hostname or conf.BROKER_HOST,
                     userid or conf.BROKER_USER,
                     password or conf.BROKER_PASSWORD,
@@ -206,6 +206,7 @@ class Celery(object):
                                 'BROKER_CONNECTION_TIMEOUT', connect_timeout),
                     transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
                                            **transport_options or {}))
+    broker_connection = connection
 
     @contextmanager
     def default_connection(self, connection=None, *args, **kwargs):
@@ -385,8 +386,8 @@ class Celery(object):
     def pool(self):
         if self._pool is None:
             register_after_fork(self, self._after_fork)
-            self._pool = self.broker_connection().Pool(
-                            limit=self.conf.BROKER_POOL_LIMIT)
+            limit = self.conf.BROKER_POOL_LIMIT
+            self._pool = self.connection().Pool(limit=limit)
         return self._pool
 
     @property

+ 1 - 1
celery/app/utils.py

@@ -131,7 +131,7 @@ def bugreport(app):
     import kombu
 
     try:
-        trans = app.broker_connection().transport
+        trans = app.connection().transport
         driver_v = '%s:%s' % (trans.driver_name, trans.driver_version())
     except Exception:
         driver_v = ''

+ 1 - 1
celery/apps/beat.py

@@ -109,7 +109,7 @@ class Beat(configurated):
     def startup_info(self, beat):
         scheduler = beat.get_scheduler(lazy=True)
         return STARTUP_INFO_FMT % {
-            'conninfo': self.app.broker_connection().as_uri(),
+            'conninfo': self.app.connection().as_uri(),
             'logfile': self.logfile or '[stderr]',
             'loglevel': LOG_LEVELS[self.loglevel],
             'loader': qualname(self.app.loader),

+ 1 - 1
celery/apps/worker.py

@@ -222,7 +222,7 @@ class Worker(configurated):
             'app': appr,
             'hostname': self.hostname,
             'version': __version__,
-            'conninfo': self.app.broker_connection().as_uri(),
+            'conninfo': self.app.connection().as_uri(),
             'concurrency': concurrency,
             'events': events,
             'queues': app.amqp.queues.format(indent=0, indent_first=False),

+ 1 - 1
celery/beat.py

@@ -299,7 +299,7 @@ class Scheduler(object):
 
     @cached_property
     def connection(self):
-        return self.app.broker_connection()
+        return self.app.connection()
 
     @cached_property
     def publisher(self):

+ 1 - 1
celery/bin/camqadm.py

@@ -342,7 +342,7 @@ class AMQPAdmin(object):
     def connect(self, conn=None):
         if conn:
             conn.close()
-        conn = self.app.broker_connection()
+        conn = self.app.connection()
         self.note('-> connecting to %s.' % conn.as_uri())
         conn.connect()
         self.note('-> connected.')

+ 4 - 4
celery/bin/celery.py

@@ -308,7 +308,7 @@ class list_(Command):
         if what not in topics:
             raise Error('unknown topic %r (choose one of: %s)' % (
                             what, available))
-        with self.app.broker_connection() as conn:
+        with self.app.connection() as conn:
             self.app.amqp.TaskConsumer(conn).declare()
             topics[what](conn.manager)
 list_ = command(list_, 'list')
@@ -655,11 +655,11 @@ class migrate(Command):
     def run(self, *args, **kwargs):
         if len(args) != 2:
             return self.show_help('migrate')
-        from kombu import BrokerConnection
+        from kombu import Connection
         from celery.contrib.migrate import migrate_tasks
 
-        migrate_tasks(BrokerConnection(args[0]),
-                      BrokerConnection(args[1]),
+        migrate_tasks(Connection(args[0]),
+                      Connection(args[1]),
                       callback=self.on_migrate_task)
 migrate = command(migrate)
 

+ 1 - 1
celery/contrib/migrate.py

@@ -13,7 +13,7 @@ import socket
 
 from functools import partial
 
-from kombu.common import eventloop
+from kombu import eventloop
 from kombu.exceptions import StdChannelError
 from kombu.utils.encoding import ensure_bytes
 

+ 1 - 3
celery/events/__init__.py

@@ -19,9 +19,7 @@ from collections import deque
 from contextlib import contextmanager
 from copy import copy
 
-from kombu.common import eventloop
-from kombu.entity import Exchange, Queue
-from kombu.messaging import Consumer, Producer
+from kombu import eventloop, Exchange, Queue, Consumer, Producer
 from kombu.utils import cached_property
 
 from celery.app import app_or_default

+ 1 - 1
celery/events/cursesmon.py

@@ -480,7 +480,7 @@ def capture_events(app, state, display):  # pragma: no cover
 
     while 1:
         sys.stderr.write('-> evtop: starting capture...\n')
-        with app.broker_connection() as conn:
+        with app.connection() as conn:
             try:
                 conn.ensure_connection(on_connection_error,
                                        app.conf.BROKER_CONNECTION_MAX_RETRIES)

+ 1 - 1
celery/events/dumper.py

@@ -76,7 +76,7 @@ def evdump(app=None, out=sys.stdout):
     app = app_or_default(app)
     dumper = Dumper(out=out)
     dumper.say('-> evdump: starting capture...')
-    conn = app.broker_connection()
+    conn = app.connection()
     recv = app.events.Receiver(conn, handlers={'*': dumper.on_event})
     try:
         recv.capture()

+ 1 - 1
celery/events/snapshot.py

@@ -101,7 +101,7 @@ def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
     cam = instantiate(camera, state, app=app, freq=freq,
                       maxrate=maxrate, timer=timer)
     cam.install()
-    conn = app.broker_connection()
+    conn = app.connection()
     recv = app.events.Receiver(conn, handlers={'*': state.event})
     try:
         try:

+ 4 - 4
celery/task/base.py

@@ -75,7 +75,7 @@ class Task(BaseTask):
     def establish_connection(self, connect_timeout=None):
         """Deprecated method used to get a broker connection.
 
-        Should be replaced with :meth:`@Celery.broker_connection`
+        Should be replaced with :meth:`@Celery.connection`
         instead, or by acquiring connections from the connection pool:
 
         .. code-block:: python
@@ -85,10 +85,10 @@ class Task(BaseTask):
                 ...
 
             # establish fresh connection
-            with celery.broker_connection() as conn:
+            with celery.connection() as conn:
                 ...
         """
-        return self._get_app().broker_connection(
+        return self._get_app().connection(
                 connect_timeout=connect_timeout)
 
     def get_publisher(self, connection=None, exchange=None,
@@ -99,7 +99,7 @@ class Task(BaseTask):
 
         .. code-block:: python
 
-            with celery.broker_connection() as conn:
+            with celery.connection() as conn:
                 with celery.amqp.TaskProducer(conn) as prod:
                     my_task.apply_async(producer=prod)
 

+ 2 - 3
celery/tests/app/test_amqp.py

@@ -11,15 +11,14 @@ from celery.tests.utils import AppCase
 class test_TaskProducer(AppCase):
 
     def test__exit__(self):
-
-        publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
+        publisher = self.app.amqp.TaskProducer(self.app.connection())
         publisher.release = Mock()
         with publisher:
             pass
         publisher.release.assert_called_with()
 
     def test_declare(self):
-        publisher = self.app.amqp.TaskProducer(self.app.broker_connection())
+        publisher = self.app.amqp.TaskProducer(self.app.connection())
         publisher.exchange.name = 'foo'
         publisher.declare()
         publisher.exchange.name = None

+ 5 - 5
celery/tests/app/test_app.py

@@ -217,7 +217,7 @@ class test_App(Case):
         def aacaX():
             pass
 
-        connection = app.broker_connection('asd://')
+        connection = app.connection('asd://')
         with self.assertRaises(KeyError):
             aacaX.apply_async(connection=connection)
 
@@ -375,13 +375,13 @@ class test_App(Case):
                                        'userid': 'guest',
                                        'password': 'guest',
                                        'virtual_host': '/'},
-                            self.app.broker_connection('amqplib://').info())
+                            self.app.connection('amqplib://').info())
         self.app.conf.BROKER_PORT = 1978
         self.app.conf.BROKER_VHOST = 'foo'
         self.assertDictContainsSubset({'port': 1978,
                                        'virtual_host': 'foo'},
-                    self.app.broker_connection('amqplib://:1978/foo').info())
-        conn = self.app.broker_connection('amqplib:////value')
+                    self.app.connection('amqplib://:1978/foo').info())
+        conn = self.app.connection('amqplib:////value')
         self.assertDictContainsSubset({'virtual_host': '/value'},
                                       conn.info())
 
@@ -422,7 +422,7 @@ class test_App(Case):
             def send(self, type, **fields):
                 self.sent.append((type, fields))
 
-        conn = self.app.broker_connection()
+        conn = self.app.connection()
         chan = conn.channel()
         try:
             for e in ('foo_exchange', 'moo_exchange', 'bar_exchange'):

+ 4 - 4
celery/tests/contrib/test_migrate.py

@@ -1,7 +1,7 @@
 from __future__ import absolute_import
 from __future__ import with_statement
 
-from kombu import BrokerConnection, Producer, Queue, Exchange
+from kombu import Connection, Producer, Queue, Exchange
 from kombu.exceptions import StdChannelError
 from mock import patch
 
@@ -53,8 +53,8 @@ class test_migrate_task(Case):
 class test_migrate_tasks(AppCase):
 
     def test_migrate(self, name='testcelery'):
-        x = BrokerConnection('memory://foo')
-        y = BrokerConnection('memory://foo')
+        x = Connection('memory://foo')
+        y = Connection('memory://foo')
         # use separate state
         x.default_channel.queues = {}
         y.default_channel.queues = {}
@@ -93,7 +93,7 @@ class test_migrate_tasks(AppCase):
             qd.side_effect = effect
             migrate_tasks(x, y)
 
-        x = BrokerConnection('memory://')
+        x = Connection('memory://')
         x.default_channel.queues = {}
         y.default_channel.queues = {}
         callback = Mock()

+ 5 - 5
celery/tests/events/test_events.py

@@ -70,7 +70,7 @@ class test_EventDispatcher(AppCase):
         eventer.flush()
 
     def test_enter_exit(self):
-        with self.app.broker_connection() as conn:
+        with self.app.connection() as conn:
             d = self.app.events.Dispatcher(conn)
             d.close = Mock()
             with d as _d:
@@ -80,7 +80,7 @@ class test_EventDispatcher(AppCase):
     def test_enable_disable_callbacks(self):
         on_enable = Mock()
         on_disable = Mock()
-        with self.app.broker_connection() as conn:
+        with self.app.connection() as conn:
             with self.app.events.Dispatcher(conn, enabled=False) as d:
                 d.on_enabled.add(on_enable)
                 d.on_disabled.add(on_disable)
@@ -90,7 +90,7 @@ class test_EventDispatcher(AppCase):
                 on_disable.assert_called_with()
 
     def test_enabled_disable(self):
-        connection = self.app.broker_connection()
+        connection = self.app.connection()
         channel = connection.channel()
         try:
             dispatcher = self.app.events.Dispatcher(connection,
@@ -160,7 +160,7 @@ class test_EventReceiver(AppCase):
             events.EventReceiver.handlers = {}
 
     def test_itercapture(self):
-        connection = self.app.broker_connection()
+        connection = self.app.connection()
         try:
             r = self.app.events.Receiver(connection, node_id='celery.tests')
             it = r.itercapture(timeout=0.0001, wakeup=False)
@@ -177,7 +177,7 @@ class test_EventReceiver(AppCase):
             connection.close()
 
     def test_itercapture_limit(self):
-        connection = self.app.broker_connection()
+        connection = self.app.connection()
         channel = connection.channel()
         try:
             events_received = [0]

+ 2 - 2
celery/tests/tasks/test_tasks.py

@@ -345,7 +345,7 @@ class test_tasks(Case):
     def test_send_task_sent_event(self):
         T1 = self.createTask('c.unittest.t.t1')
         app = T1.app
-        conn = app.broker_connection()
+        conn = app.connection()
         chan = conn.channel()
         app.conf.CELERY_SEND_TASK_SENT_EVENT = True
         dispatcher = [None]
@@ -366,7 +366,7 @@ class test_tasks(Case):
         self.assertTrue(dispatcher[0])
 
     def test_get_publisher(self):
-        connection = app_or_default().broker_connection()
+        connection = app_or_default().connection()
         p = increment_counter.get_publisher(connection, auto_declare=False,
                                             exchange='foo')
         self.assertEqual(p.exchange.name, 'foo')

+ 1 - 1
celery/tests/worker/test_control.py

@@ -125,7 +125,7 @@ class test_ControlPanel(Case):
     def test_active_queues(self):
         import kombu
 
-        x = kombu.Consumer(current_app.broker_connection(),
+        x = kombu.Consumer(current_app.connection(),
                            [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'),
                             kombu.Queue('bar', kombu.Exchange('bar'), 'bar')],
                            auto_declare=False)

+ 10 - 10
celery/tests/worker/test_worker.py

@@ -8,9 +8,9 @@ from datetime import datetime, timedelta
 from Queue import Empty
 
 from billiard.exceptions import WorkerLostError
+from kombu import Connection
 from kombu.exceptions import StdChannelError
 from kombu.transport.base import Message
-from kombu.connection import BrokerConnection
 from mock import Mock, patch
 from nose import SkipTest
 
@@ -221,7 +221,7 @@ class test_Consumer(Case):
         self.assertEqual(info['prefetch_count'], 10)
         self.assertFalse(info['broker'])
 
-        l.connection = current_app.broker_connection()
+        l.connection = current_app.connection()
         info = l.info
         self.assertTrue(info['broker'])
 
@@ -234,7 +234,7 @@ class test_Consumer(Case):
         l = MyKombuConsumer(self.ready_queue, timer=self.timer)
 
         l.reset_connection()
-        self.assertIsInstance(l.connection, BrokerConnection)
+        self.assertIsInstance(l.connection, Connection)
 
         l._state = RUN
         l.event_dispatcher = None
@@ -247,7 +247,7 @@ class test_Consumer(Case):
         self.assertIsNone(l.task_consumer)
 
         l.reset_connection()
-        self.assertIsInstance(l.connection, BrokerConnection)
+        self.assertIsInstance(l.connection, Connection)
         l.stop_consumers()
 
         l.stop()
@@ -379,7 +379,7 @@ class test_Consumer(Case):
 
     def test_consume_messages_ignores_socket_timeout(self):
 
-        class Connection(current_app.broker_connection().__class__):
+        class Connection(current_app.connection().__class__):
             obj = None
 
             def drain_events(self, **kwargs):
@@ -395,7 +395,7 @@ class test_Consumer(Case):
 
     def test_consume_messages_when_socket_error(self):
 
-        class Connection(current_app.broker_connection().__class__):
+        class Connection(current_app.connection().__class__):
             obj = None
 
             def drain_events(self, **kwargs):
@@ -417,7 +417,7 @@ class test_Consumer(Case):
 
     def test_consume_messages(self):
 
-        class Connection(current_app.broker_connection().__class__):
+        class Connection(current_app.connection().__class__):
             obj = None
 
             def drain_events(self, **kwargs):
@@ -647,7 +647,7 @@ class test_Consumer(Case):
         self.assertIsNone(l.connection)
         self.assertTrue(connections[0].closed)
 
-    @patch('kombu.connection.BrokerConnection._establish_connection')
+    @patch('kombu.connection.Connection._establish_connection')
     @patch('kombu.utils.sleep')
     def test_open_connection_errback(self, sleep, connect):
         l = MyKombuConsumer(self.ready_queue, timer=self.timer)
@@ -691,7 +691,7 @@ class test_Consumer(Case):
         l.task_consumer = Mock()
         l.broadcast_consumer = Mock()
         l.qos = _QoS()
-        l.connection = BrokerConnection()
+        l.connection = Connection()
         l.iterations = 0
 
         def raises_KeyError(limit=None):
@@ -714,7 +714,7 @@ class test_Consumer(Case):
         l.qos = _QoS()
         l.task_consumer = Mock()
         l.broadcast_consumer = Mock()
-        l.connection = BrokerConnection()
+        l.connection = Connection()
         l.consume_messages = Mock(side_effect=socket.error('foo'))
         with self.assertRaises(socket.error):
             l.start()

+ 1 - 1
celery/worker/__init__.py

@@ -321,7 +321,7 @@ class WorkController(configurated):
         self.pidfile = pidfile
         self.pidlock = None
         self.use_eventloop = (detect_environment() == 'default' and
-                              self.app.broker_connection().is_evented and
+                              self.app.connection().is_evented and
                               not self.app.IS_WINDOWS)
 
         # Update celery_include to have all known task modules, so that we

+ 2 - 2
celery/worker/consumer.py

@@ -331,7 +331,7 @@ class Consumer(object):
         self.pidbox_node = self.app.control.mailbox.Node(self.hostname,
                                                          state=pidbox_state,
                                                          handlers=Panel.data)
-        conninfo = self.app.broker_connection()
+        conninfo = self.app.connection()
         self.connection_errors = conninfo.connection_errors
         self.channel_errors = conninfo.channel_errors
 
@@ -739,7 +739,7 @@ class Consumer(object):
 
         # remember that the connection is lazy, it won't establish
         # until it's needed.
-        conn = self.app.broker_connection()
+        conn = self.app.connection()
         if not self.app.conf.BROKER_CONNECTION_RETRY:
             # retry disabled, just call connect directly.
             conn.connect()

+ 1 - 1
celery/worker/job.py

@@ -417,7 +417,7 @@ class Request(object):
                     self.name, self.id,
                     ' eta:[%s]' % (self.eta, ) if self.eta else '',
                     ' expires:[%s]' % (self.expires, ) if self.expires else '')
-    shortinfo == __str__
+    shortinfo = __str__
 
     def __repr__(self):
         return '<%s %s: %s>' % (type(self).__name__, self.id,

+ 2 - 2
docs/reference/celery.app.amqp.rst

@@ -10,10 +10,10 @@
 
     .. autoclass:: AMQP
 
-        .. attribute:: BrokerConnection
+        .. attribute:: Connection
 
             Broker connection class used.  Default is
-            :class:`kombu.connection.BrokerConnection`.
+            :class:`kombu.connection.Connection`.
 
         .. attribute:: Consumer
 

+ 2 - 2
docs/reference/celery.rst

@@ -189,7 +189,7 @@ Application
         Celerybeat scheduler application.
         See :class:`~@Beat`.
 
-    .. method:: Celery.broker_connection(url=default, [ssl, [transport_options={}]])
+    .. method:: Celery.connection(url=default, [ssl, [transport_options={}]])
 
         Establish a connection to the message broker.
 
@@ -206,7 +206,7 @@ Application
         :keyword transport: defaults to the :setting:`BROKER_TRANSPORT`
                  setting.
 
-        :returns :class:`kombu.connection.BrokerConnection`:
+        :returns :class:`kombu.connection.Connection`:
 
     .. method:: Celery.default_connection(connection=None)
 

+ 3 - 1
docs/whatsnew-2.6.rst

@@ -621,7 +621,9 @@ In Other News
 
     .. code-block:: python
 
-        i = celery.control.inspect(connection=BrokerConnection('redis://'))
+        from kombu import Connection
+
+        i = celery.control.inspect(connection=Connection('redis://'))
         i.active_queues()
 
 * Module :mod:`celery.app.task` is now a module instead of a package.

+ 1 - 1
examples/eventlet/bulk_task_producer.py

@@ -48,7 +48,7 @@ class ProducerPool(object):
                                 for _ in xrange(self.size)]
 
     def _producer(self):
-        connection = current_app.broker_connection()
+        connection = current_app.connection()
         publisher = current_app.amqp.TaskProducer(connection)
         inqueue = self.inqueue