Parcourir la source

Adds broker_read_url and broker_write_url settings

These enable you to separate the broker URLs used for consuming and producing
respectively.

In addition to the configuration options two new methods have been added to the
app:

* ``app.connection_for_read()``
* ``app.connection_for_write()``

these should now be used, instead of `app.connection()`, to specify the
intent of the required connection.
Ask Solem il y a 9 ans
Parent
commit
6038ff2aa3

+ 2 - 1
celery/app/amqp.py

@@ -585,7 +585,8 @@ class AMQP(object):
     @property
     def producer_pool(self):
         if self._producer_pool is None:
-            self._producer_pool = pools.producers[self.app.connection()]
+            self._producer_pool = pools.producers[
+                self.app.connection_for_write()]
             self._producer_pool.limit = self.app.pool.limit
         return self._producer_pool
     publisher_pool = producer_pool  # compat alias

+ 44 - 4
celery/app/base.py

@@ -272,7 +272,7 @@ class Celery(object):
         use the with statement instead::
 
             with Celery(set_as_current=False) as app:
-                with app.connection() as conn:
+                with app.connection_for_write() as conn:
                     pass
         """
         self._pool = None
@@ -655,6 +655,22 @@ class Celery(object):
                 parent.add_trail(result)
         return result
 
+    def connection_for_read(self, url=None, **kwargs):
+        """Establish connection used for consuming.
+
+        See :meth:`connection` for supported arguments.
+
+        """
+        return self._connection(url or self.conf.broker_read_url, **kwargs)
+
+    def connection_for_write(self, url=None, **kwargs):
+        """Establish connection used for producing.
+
+        See :meth:`connection` for supported arguments.
+
+        """
+        return self._connection(url or self.conf.broker_write_url, **kwargs)
+
     def connection(self, hostname=None, userid=None, password=None,
                    virtual_host=None, port=None, ssl=None,
                    connect_timeout=None, transport=None,
@@ -662,6 +678,10 @@ class Celery(object):
                    login_method=None, failover_strategy=None, **kwargs):
         """Establish a connection to the message broker.
 
+        Please use :meth:`connection_for_read` and
+        :meth:`connection_for_write` instead, to convey the intent
+        of use for this connection.
+
         :param url: Either the URL or the hostname of the broker to use.
 
         :keyword hostname: URL, Hostname/IP-address of the broker.
@@ -674,13 +694,33 @@ class Celery(object):
         :keyword ssl: Defaults to the :setting:`broker_use_ssl` setting.
         :keyword transport: defaults to the :setting:`broker_transport`
                  setting.
+        :keyword transport_options: Dictionary of transport specific options.
+        :keyword heartbeat: AMQP Heartbeat in seconds (pyamqp only).
+        :keyword login_method: Custom login method to use (amqp only).
+        :keyword failover_strategy: Custom failover strategy.
+        :keyword \*\*kwargs: Additional arguments to :class:`kombu.Connection`.
 
         :returns :class:`kombu.Connection`:
 
         """
+        return self.connection_for_write(
+            hostname or self.conf.broker_write_url,
+            userid=userid, password=password,
+            virtual_host=virtual_host, port=port, ssl=ssl,
+            connect_timeout=connect_timeout, transport=transport,
+            transport_options=transport_options, heartbeat=heartbeat,
+            login_method=login_method, failover_strategy=failover_strategy,
+            **kwargs
+        )
+
+    def _connection(self, url, userid=None, password=None,
+                   virtual_host=None, port=None, ssl=None,
+                   connect_timeout=None, transport=None,
+                   transport_options=None, heartbeat=None,
+                   login_method=None, failover_strategy=None, **kwargs):
         conf = self.conf
         return self.amqp.Connection(
-            hostname or conf.broker_url,
+            url,
             userid or conf.broker_user,
             password or conf.broker_password,
             virtual_host or conf.broker_vhost,
@@ -705,7 +745,7 @@ class Celery(object):
         """Helper for :meth:`connection_or_acquire`."""
         if pool:
             return self.pool.acquire(block=True)
-        return self.connection()
+        return self.connection_for_write()
 
     def connection_or_acquire(self, connection=None, pool=True, *_, **__):
         """For use within a with-statement to get a connection from the pool
@@ -1002,7 +1042,7 @@ class Celery(object):
             self._ensure_after_fork()
             limit = self.conf.broker_pool_limit
             pools.set_limit(limit)
-            self._pool = pools.connections[self.connection()]
+            self._pool = pools.connections[self.connection_for_write()]
         return self._pool
 
     @property

+ 2 - 0
celery/app/defaults.py

@@ -98,6 +98,8 @@ NAMESPACES = Namespace(
     ),
     broker=Namespace(
         url=Option(None, type='string'),
+        read_url=Option(None, type='string'),
+        write_url=Option(None, type='string'),
         transport=Option(type='string'),
         transport_options=Option({}, type='dict'),
         connection_timeout=Option(4, type='float'),

+ 20 - 2
celery/app/utils.py

@@ -86,10 +86,28 @@ class Settings(ConfigurationView):
 
     """
 
+    @property
+    def broker_read_url(self):
+        return (
+            os.environ.get('CELERY_BROKER_READ_URL') or
+            self.get('broker_read_url') or
+            self.broker_url
+        )
+
+    @property
+    def broker_write_url(self):
+        return (
+            os.environ.get('CELERY_BROKER_WRITE_URL') or
+            self.get('broker_write_url') or
+            self.broker_url
+        )
+
     @property
     def broker_url(self):
-        return (os.environ.get('CELERY_BROKER_URL') or
-                self.first('broker_url', 'broker_host'))
+        return (
+            os.environ.get('CELERY_BROKER_URL') or
+            self.first('broker_url', 'broker_host')
+        )
 
     @property
     def timezone(self):

+ 1 - 1
celery/beat.py

@@ -381,7 +381,7 @@ class Scheduler(object):
 
     @cached_property
     def connection(self):
-        return self.app.connection()
+        return self.app.connection_for_write()
 
     @cached_property
     def producer(self):

+ 1 - 1
celery/bin/celery.py

@@ -337,7 +337,7 @@ class _RemoteControl(Command):
             raise self.UsageError(
                 'Unknown {0.name} method {1}'.format(self, method))
 
-        if self.app.connection().transport.driver_type == 'sql':
+        if self.app.connection_for_write().transport.driver_type == 'sql':
             raise self.Error('Broadcast not supported by SQL broker transport')
 
         output_json = kwargs.get('json')

+ 2 - 1
celery/bin/graph.py

@@ -166,7 +166,8 @@ class graph(Command):
                 list(range(int(threads))), 'P', Tmax,
             )
 
-        broker = Broker(args.get('broker', self.app.connection().as_uri()))
+        broker = Broker(args.get(
+            'broker', self.app.connection_for_read().as_uri()))
         backend = Backend(backend) if backend else None
         graph = DependencyGraph(formatter=Formatter())
         graph.add_arc(broker)

+ 3 - 2
celery/events/__init__.py

@@ -140,7 +140,7 @@ class EventDispatcher(object):
         if not connection and channel:
             self.connection = channel.connection.client
         self.enabled = enabled
-        conninfo = self.connection or self.app.connection()
+        conninfo = self.connection or self.app.connection_for_write()
         self.exchange = get_exchange(conninfo)
         if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
             self.enabled = False
@@ -307,7 +307,8 @@ class EventReceiver(ConsumerMixin):
         self.routing_key = routing_key
         self.node_id = node_id or uuid()
         self.queue_prefix = queue_prefix
-        self.exchange = get_exchange(self.connection or self.app.connection())
+        self.exchange = get_exchange(
+            self.connection or self.app.connection_for_write())
         self.queue = Queue(
             '.'.join([self.queue_prefix, self.node_id]),
             exchange=self.exchange,

+ 1 - 1
celery/events/cursesmon.py

@@ -508,7 +508,7 @@ def capture_events(app, state, display):  # pragma: no cover
 
     while 1:
         print('-> evtop: starting capture...', file=sys.stderr)
-        with app.connection() as conn:
+        with app.connection_for_read() as conn:
             try:
                 conn.ensure_connection(on_connection_error,
                                        app.conf.broker_connection_max_retries)

+ 1 - 1
celery/events/dumper.py

@@ -88,7 +88,7 @@ def evdump(app=None, out=sys.stdout):
     app = app_or_default(app)
     dumper = Dumper(out=out)
     dumper.say('-> evdump: starting capture...')
-    conn = app.connection().clone()
+    conn = app.connection_for_read().clone()
 
     def _error_handler(exc, interval):
         dumper.say(CONNECTION_ERROR % (

+ 1 - 1
celery/events/snapshot.py

@@ -102,7 +102,7 @@ def evcam(camera, freq=1.0, maxrate=None, loglevel=0,
     cam = instantiate(camera, state, app=app, freq=freq,
                       maxrate=maxrate, timer=timer)
     cam.install()
-    conn = app.connection()
+    conn = app.connection_for_read()
     recv = app.events.Receiver(conn, handlers={'*': state.event})
     try:
         try:

+ 3 - 3
celery/task/base.py

@@ -192,10 +192,10 @@ class Task(BaseTask):
                 ...
 
             # establish fresh connection
-            with celery.connection() as conn:
+            with celery.connection_for_write() as conn:
                 ...
         """
-        return self._get_app().connection()
+        return self._get_app().connection_for_write()
 
     def get_publisher(self, connection=None, exchange=None,
                       exchange_type=None, **options):
@@ -205,7 +205,7 @@ class Task(BaseTask):
 
         .. code-block:: python
 
-            with app.connection() as conn:
+            with app.connection_for_write() as conn:
                 with app.amqp.Producer(conn) as prod:
                     my_task.apply_async(producer=prod)
 

+ 1 - 1
celery/tests/bin/test_celeryevdump.py

@@ -56,7 +56,7 @@ class test_Dumper(AppCase):
                 raise KeyError()
             recv.capture.side_effect = se
 
-            Conn = app.connection.return_value = Mock(name='conn')
+            Conn = app.connection_for_read.return_value = Mock(name='conn')
             conn = Conn.clone.return_value = Mock(name='cloned_conn')
             conn.connection_errors = (KeyError,)
             conn.channel_errors = ()

+ 7 - 7
celery/tests/events/test_events.py

@@ -66,7 +66,7 @@ class test_EventDispatcher(AppCase):
 
     def test_send(self):
         producer = MockProducer()
-        producer.connection = self.app.connection()
+        producer.connection = self.app.connection_for_write()
         connection = Mock()
         connection.transport.driver_type = 'amqp'
         eventer = self.app.events.Dispatcher(connection, enabled=False,
@@ -98,7 +98,7 @@ class test_EventDispatcher(AppCase):
     def test_send_buffer_group(self):
         buf_received = [None]
         producer = MockProducer()
-        producer.connection = self.app.connection()
+        producer.connection = self.app.connection_for_write()
         connection = Mock()
         connection.transport.driver_type = 'amqp'
         eventer = self.app.events.Dispatcher(
@@ -134,7 +134,7 @@ class test_EventDispatcher(AppCase):
         eventer.flush(errors=False, groups=False)
 
     def test_enter_exit(self):
-        with self.app.connection() as conn:
+        with self.app.connection_for_write() as conn:
             d = self.app.events.Dispatcher(conn)
             d.close = Mock()
             with d as _d:
@@ -144,7 +144,7 @@ class test_EventDispatcher(AppCase):
     def test_enable_disable_callbacks(self):
         on_enable = Mock()
         on_disable = Mock()
-        with self.app.connection() as conn:
+        with self.app.connection_for_write() as conn:
             with self.app.events.Dispatcher(conn, enabled=False) as d:
                 d.on_enabled.add(on_enable)
                 d.on_disabled.add(on_disable)
@@ -154,7 +154,7 @@ class test_EventDispatcher(AppCase):
                 on_disable.assert_called_with()
 
     def test_enabled_disable(self):
-        connection = self.app.connection()
+        connection = self.app.connection_for_write()
         channel = connection.channel()
         try:
             dispatcher = self.app.events.Dispatcher(connection,
@@ -235,7 +235,7 @@ class test_EventReceiver(AppCase):
         self.assertTrue(got_event[0])
 
     def test_itercapture(self):
-        connection = self.app.connection()
+        connection = self.app.connection_for_write()
         try:
             r = self.app.events.Receiver(connection, node_id='celery.tests')
             it = r.itercapture(timeout=0.0001, wakeup=False)
@@ -284,7 +284,7 @@ class test_EventReceiver(AppCase):
         r.process.assert_has_calls([call(1), call(2), call(3)])
 
     def test_itercapture_limit(self):
-        connection = self.app.connection()
+        connection = self.app.connection_for_write()
         channel = connection.channel()
         try:
             events_received = [0]

+ 18 - 9
celery/tests/worker/test_consumer.py

@@ -196,8 +196,8 @@ class test_Consumer(AppCase):
             c.on_close()
 
     def test_connect_error_handler(self):
-        self.app.connection = _amqp_connection()
-        conn = self.app.connection.return_value
+        self.app._connection = _amqp_connection()
+        conn = self.app._connection.return_value
         c = self.get_consumer()
         self.assertTrue(c.connect())
         self.assertTrue(conn.ensure_connection.called)
@@ -275,7 +275,7 @@ class test_Mingle(AppCase):
 
     def test_start_no_replies(self):
         c = Mock()
-        c.app.connection = _amqp_connection()
+        c.app.connection_for_read = _amqp_connection()
         mingle = Mingle(c)
         I = c.app.control.inspect.return_value = Mock()
         I.hello.return_value = {}
@@ -284,7 +284,7 @@ class test_Mingle(AppCase):
     def test_start(self):
         try:
             c = Mock()
-            c.app.connection = _amqp_connection()
+            c.app.connection_for_read = _amqp_connection()
             mingle = Mingle(c)
             self.assertTrue(mingle.enabled)
 
@@ -332,14 +332,14 @@ class test_Gossip(AppCase):
 
     def test_init(self):
         c = self.Consumer()
-        c.app.connection = _amqp_connection()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         self.assertTrue(g.enabled)
         self.assertIs(c.gossip, g)
 
     def test_election(self):
         c = self.Consumer()
-        c.app.connection = _amqp_connection()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         g.start(c)
         g.election('id', 'topic', 'action')
@@ -350,7 +350,7 @@ class test_Gossip(AppCase):
 
     def test_call_task(self):
         c = self.Consumer()
-        c.app.connection = _amqp_connection()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         g.start(c)
 
@@ -381,7 +381,7 @@ class test_Gossip(AppCase):
 
     def test_on_elect(self):
         c = self.Consumer()
-        c.app.connection = _amqp_connection()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         g.start(c)
 
@@ -433,6 +433,7 @@ class test_Gossip(AppCase):
 
     def test_on_elect_ack_win(self):
         c = self.Consumer(hostname='foo@x.com')  # I will win
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         handler = g.election_handlers['topic'] = Mock()
         self.setup_election(g, c)
@@ -440,7 +441,7 @@ class test_Gossip(AppCase):
 
     def test_on_elect_ack_lose(self):
         c = self.Consumer(hostname='bar@x.com')  # I will lose
-        c.app.connection = _amqp_connection()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         handler = g.election_handlers['topic'] = Mock()
         self.setup_election(g, c)
@@ -448,6 +449,7 @@ class test_Gossip(AppCase):
 
     def test_on_elect_ack_win_but_no_action(self):
         c = self.Consumer(hostname='foo@x.com')  # I will win
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         g.election_handlers = {}
         with patch('celery.worker.consumer.error') as error:
@@ -456,6 +458,7 @@ class test_Gossip(AppCase):
 
     def test_on_node_join(self):
         c = self.Consumer()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         with patch('celery.worker.consumer.debug') as debug:
             g.on_node_join(c)
@@ -463,6 +466,7 @@ class test_Gossip(AppCase):
 
     def test_on_node_leave(self):
         c = self.Consumer()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         with patch('celery.worker.consumer.debug') as debug:
             g.on_node_leave(c)
@@ -470,6 +474,7 @@ class test_Gossip(AppCase):
 
     def test_on_node_lost(self):
         c = self.Consumer()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         with patch('celery.worker.consumer.info') as info:
             g.on_node_lost(c)
@@ -477,6 +482,7 @@ class test_Gossip(AppCase):
 
     def test_register_timer(self):
         c = self.Consumer()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         g.register_timer()
         c.timer.call_repeatedly.assert_called_with(g.interval, g.periodic)
@@ -486,6 +492,7 @@ class test_Gossip(AppCase):
 
     def test_periodic(self):
         c = self.Consumer()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         g.on_node_lost = Mock()
         state = g.state = Mock()
@@ -503,6 +510,7 @@ class test_Gossip(AppCase):
 
     def test_on_message__task(self):
         c = self.Consumer()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         self.assertTrue(g.enabled)
         message = Mock(name='message')
@@ -511,6 +519,7 @@ class test_Gossip(AppCase):
 
     def test_on_message(self):
         c = self.Consumer()
+        c.app.connection_for_read = _amqp_connection()
         g = Gossip(c)
         self.assertTrue(g.enabled)
         prepare = Mock()

+ 2 - 2
celery/tests/worker/test_control.py

@@ -98,7 +98,7 @@ class test_Pidbox_green(AppCase):
 
     def test_loop(self):
         parent = Mock()
-        conn = parent.connect.return_value = self.app.connection()
+        conn = parent.connect.return_value = self.app.connection_for_read()
         drain = conn.drain_events = Mock()
         g = gPidbox(parent)
         parent.connection = Mock()
@@ -252,7 +252,7 @@ class test_ControlPanel(AppCase):
     def test_active_queues(self):
         import kombu
 
-        x = kombu.Consumer(self.app.connection(),
+        x = kombu.Consumer(self.app.connection_for_read(),
                            [kombu.Queue('foo', kombu.Exchange('foo'), 'foo'),
                             kombu.Queue('bar', kombu.Exchange('bar'), 'bar')],
                            auto_declare=False)

+ 3 - 3
celery/tests/worker/test_worker.py

@@ -360,7 +360,7 @@ class test_Consumer(AppCase):
 
     def test_loop_ignores_socket_timeout(self):
 
-        class Connection(self.app.connection().__class__):
+        class Connection(self.app.connection_for_read().__class__):
             obj = None
 
             def drain_events(self, **kwargs):
@@ -376,7 +376,7 @@ class test_Consumer(AppCase):
 
     def test_loop_when_socket_error(self):
 
-        class Connection(self.app.connection().__class__):
+        class Connection(self.app.connection_for_read().__class__):
             obj = None
 
             def drain_events(self, **kwargs):
@@ -398,7 +398,7 @@ class test_Consumer(AppCase):
 
     def test_loop(self):
 
-        class Connection(self.app.connection().__class__):
+        class Connection(self.app.connection_for_read().__class__):
             obj = None
 
             def drain_events(self, **kwargs):

+ 1 - 1
celery/worker/__init__.py

@@ -122,7 +122,7 @@ class WorkController(object):
         self.ready_callback = ready_callback or self.on_consumer_ready
 
         # this connection is not established, only used for params
-        self._conninfo = self.app.connection()
+        self._conninfo = self.app.connection_for_read()
         self.use_eventloop = (
             self.should_use_eventloop() if use_eventloop is None
             else use_eventloop

+ 4 - 4
celery/worker/consumer.py

@@ -178,7 +178,7 @@ class Consumer(object):
         self.pool = pool
         self.timer = timer
         self.strategies = self.Strategies()
-        self.conninfo = self.app.connection()
+        self.conninfo = self.app.connection_for_read()
         self.connection_errors = self.conninfo.connection_errors
         self.channel_errors = self.conninfo.channel_errors
         self._restart_state = restart_state(maxR=5, maxT=1)
@@ -376,7 +376,7 @@ class Consumer(object):
         :setting:`broker_connection_retry` setting is enabled
 
         """
-        conn = self.app.connection(heartbeat=self.amqheartbeat)
+        conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)
 
         # Callback called for each retry while the connection
         # can't be established.
@@ -635,7 +635,7 @@ class Mingle(bootsteps.StartStopStep):
         self.enabled = not without_mingle and self.compatible_transport(c.app)
 
     def compatible_transport(self, app):
-        with app.connection() as conn:
+        with app.connection_for_read() as conn:
             return conn.transport.driver_type in self.compatible_transports
 
     def start(self, c):
@@ -776,7 +776,7 @@ class Gossip(bootsteps.ConsumerStep):
         }
 
     def compatible_transport(self, app):
-        with app.connection() as conn:
+        with app.connection_for_read() as conn:
             return conn.transport.driver_type in self.compatible_transports
 
     def election(self, id, topic, action=None):

+ 19 - 0
docs/configuration.rst

@@ -1513,6 +1513,25 @@ The brokers will then be used in the :setting:`broker_failover_strategy`.
 See :ref:`kombu:connection-urls` in the Kombu documentation for more
 information.
 
+.. setting:: broker_read_url
+
+.. setting:: broker_write_url
+
+broker_read_url / broker_write_url
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+These settings can be configured, instead of :setting:`broker_url` to specify
+different connection parameters for broker connections used for consuming and
+producing.
+
+Example::
+
+    broker_read_url = 'amqp://user:pass@broker.example.com:56721'
+    broker_write_url = 'amqp://user:pass@broker.example.com:56722'
+
+Both options can also be specified as a list for failover alternates, see
+:setting:`broker_url` for more information.
+
 .. setting:: broker_failover_strategy
 
 broker_failover_strategy