Browse Source

Provide BROKER_FAILOVER_STRATEGY option in Celery.

This option allows for either selecting pre-existing kombu.connection
failover strategies, or passing in your own method reference.

Conflicts:
	celery/app/base.py
Matt Wise 11 năm trước cách đây
mục cha
commit
f7b4767b91
4 tập tin đã thay đổi với 61 bổ sung6 xóa
  1. 13 6
      celery/app/base.py
  2. 1 0
      celery/app/defaults.py
  3. 27 0
      celery/tests/app/test_app.py
  4. 20 0
      docs/configuration.rst

+ 13 - 6
celery/app/base.py

@@ -315,7 +315,8 @@ class Celery(object):
     def connection(self, hostname=None, userid=None, password=None,
                    virtual_host=None, port=None, ssl=None,
                    connect_timeout=None, transport=None,
-                   transport_options=None, heartbeat=None, **kwargs):
+                   transport_options=None, heartbeat=None,
+                   login_method=None, failover_strategy=None, **kwargs):
         conf = self.conf
         return self.amqp.Connection(
             hostname or conf.BROKER_URL,
@@ -325,12 +326,18 @@ class Celery(object):
             port or conf.BROKER_PORT,
             transport=transport or conf.BROKER_TRANSPORT,
             ssl=self.either('BROKER_USE_SSL', ssl),
-            connect_timeout=self.either(
-                'BROKER_CONNECTION_TIMEOUT', connect_timeout),
             heartbeat=heartbeat,
-            login_method=self.either('BROKER_LOGIN_METHOD', None),
-            transport_options=dict(conf.BROKER_TRANSPORT_OPTIONS,
-                                   **transport_options or {}))
+            login_method=login_method or self.app.conf.BROKER_LOGIN_METHOD,
+            failover_strategy=(
+                failover_strategy or conf.BROKER_FAILOVER_STRATEGY
+            ),
+            transport_options=dict(
+                conf.BROKER_TRANSPORT_OPTIONS, **transport_options or {}
+            ),
+            connect_timeout=self.either(
+                'BROKER_CONNECTION_TIMEOUT', connect_timeout
+            ),
+        )
     broker_connection = connection
 
     @contextmanager

+ 1 - 0
celery/app/defaults.py

@@ -73,6 +73,7 @@ NAMESPACES = {
         'CONNECTION_TIMEOUT': Option(4, type='float'),
         'CONNECTION_RETRY': Option(True, type='bool'),
         'CONNECTION_MAX_RETRIES': Option(100, type='int'),
+        'FAILOVER_STRATEGY': Option(None, type='string'),
         'HEARTBEAT': Option(None, type='int'),
         'HEARTBEAT_CHECKRATE': Option(3.0, type='int'),
         'LOGIN_METHOD': Option(None, type='string'),

+ 27 - 0
celery/tests/app/test_app.py

@@ -1,6 +1,7 @@
 from __future__ import absolute_import
 
 import os
+import itertools
 
 from copy import deepcopy
 from mock import Mock, patch
@@ -484,6 +485,32 @@ class test_App(AppCase):
         self.assertDictContainsSubset({'virtual_host': '/value'},
                                       conn.info())
 
+    def test_amqp_failover_strategy_selection(self):
+        # Test passing in a string and make sure the string
+        # gets there untouched
+        self.app.conf.BROKER_FAILOVER_STRATEGY = 'foo-bar'
+        self.assertEquals(
+            self.app.connection('amqp:////value').failover_strategy,
+            'foo-bar',
+        )
+
+        # Try passing in None
+        self.app.conf.BROKER_FAILOVER_STRATEGY = None
+        self.assertEquals(
+            self.app.connection('amqp:////value').failover_strategy,
+            itertools.cycle,
+        )
+
+        # Test passing in a method
+        def my_failover_strategy(it):
+            yield True
+
+        self.app.conf.BROKER_FAILOVER_STRATEGY = my_failover_strategy
+        self.assertEquals(
+            self.app.connection('amqp:////value').failover_strategy,
+            my_failover_strategy,
+        )
+
     def test_BROKER_BACKEND_alias(self):
         self.assertEqual(self.app.conf.BROKER_BACKEND,
                          self.app.conf.BROKER_TRANSPORT)

+ 20 - 0
docs/configuration.rst

@@ -875,6 +875,26 @@ Example::
 
 .. setting:: BROKER_TRANSPORT
 
+BROKER_FAILOVER_STRATEGY
+~~~~~~~~~~~~~~~~~~~~~~~~
+
+Default failover strategy for the broker Connection object. If supplied,
+may map to a key in 'kombu.connection.failover_strategies', or be a reference
+to any method that yields a single item from a supplied list.
+
+Example::
+
+    # Random failover strategy
+    def random_failover_strategy(servers):
+        it = list(it)  # don't modify callers list
+        shuffle = random.shuffle
+        for _ in repeat(None):
+            shuffle(it)
+            yield it[0]
+
+    BROKER_FAILOVER_STRATEGY=random_failover_strategy
+
+
 BROKER_TRANSPORT
 ~~~~~~~~~~~~~~~~
 :Aliases: ``BROKER_BACKEND``