瀏覽代碼

Merge pull request #2553 from samjy/master

Improve use of uri in mongo backend
PMickael 10 年之前
父節點
當前提交
cb83434fdf
共有 2 個文件被更改,包括 111 次插入25 次删除
  1. 54 25
      celery/backends/mongodb.py
  2. 57 0
      celery/tests/backends/test_mongodb.py

+ 54 - 25
celery/backends/mongodb.py

@@ -45,6 +45,7 @@ class Bunch(object):
 
 class MongoBackend(BaseBackend):
 
+    mongo_host = None
     host = 'localhost'
     port = 27017
     user = None
@@ -75,6 +76,28 @@ class MongoBackend(BaseBackend):
                 'You need to install the pymongo library to use the '
                 'MongoDB backend.')
 
+        self.url = url
+
+        # default options
+        self.options.setdefault('max_pool_size', self.max_pool_size)
+        self.options.setdefault('auto_start_request', False)
+
+        # update conf with mongo uri data, only if uri was given
+        if self.url:
+            uri_data = pymongo.uri_parser.parse_uri(self.url)
+            # build the hosts list to create a mongo connection
+            make_host_str = lambda x: "{0}:{1}".format(x[0], x[1])
+            hostslist = map(make_host_str, uri_data['nodelist'])
+            self.user = uri_data['username']
+            self.password = uri_data['password']
+            self.mongo_host = hostslist
+            if uri_data['database']:
+                # if no database is provided in the uri, use default
+                self.database_name = uri_data['database']
+
+            self.options.update(uri_data['options'])
+
+        # update conf with specific settings
         config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS')
         if config is not None:
             if not isinstance(config, dict):
@@ -82,8 +105,13 @@ class MongoBackend(BaseBackend):
                     'MongoDB backend settings should be grouped in a dict')
             config = dict(config)  # do not modify original
 
+            if 'host' in config or 'port' in config:
+                # these should take over uri conf
+                self.mongo_host = None
+
             self.host = config.pop('host', self.host)
-            self.port = int(config.pop('port', self.port))
+            self.port = config.pop('port', self.port)
+            self.mongo_host = config.pop('mongo_host', self.mongo_host)
             self.user = config.pop('user', self.user)
             self.password = config.pop('password', self.password)
             self.database_name = config.pop('database', self.database_name)
@@ -94,37 +122,38 @@ class MongoBackend(BaseBackend):
                 'groupmeta_collection', self.groupmeta_collection,
             )
 
-            self.options = dict(config, **config.pop('options', None) or {})
-
-            # Set option defaults
-            self.options.setdefault('max_pool_size', self.max_pool_size)
-            self.options.setdefault('auto_start_request', False)
-
-        self.url = url
-        if self.url:
-            # Specifying backend as an URL
-            self.host = self.url
+            self.options.update(config.pop('options', {}))
+            self.options.update(config)
 
     def _get_connection(self):
         """Connect to the MongoDB server."""
         if self._connection is None:
             from pymongo import MongoClient
 
-            # The first pymongo.Connection() argument (host) can be
-            # a list of ['host:port'] elements or a mongodb connection
-            # URI. If this is the case, don't use self.port
-            # but let pymongo get the port(s) from the URI instead.
-            # This enables the use of replica sets and sharding.
-            # See pymongo.Connection() for more info.
-            url = self.host
-            if isinstance(url, string_t) \
-                    and not url.startswith('mongodb://'):
-                url = 'mongodb://{0}:{1}'.format(url, self.port)
-            if url == 'mongodb://':
-                url = url + 'localhost'
+            host = self.mongo_host
+            if not host:
+                # The first pymongo.Connection() argument (host) can be
+                # a list of ['host:port'] elements or a mongodb connection
+                # URI. If this is the case, don't use self.port
+                # but let pymongo get the port(s) from the URI instead.
+                # This enables the use of replica sets and sharding.
+                # See pymongo.Connection() for more info.
+                host = self.host
+                if isinstance(host, string_t) \
+                   and not host.startswith('mongodb://'):
+                    host = 'mongodb://{0}:{1}'.format(host, self.port)
+
+                if host == 'mongodb://':
+                    host += 'localhost'
+
+            # don't change self.options
+            conf = dict(self.options)
+            conf['host'] = host
+
             if detect_environment() != 'default':
-                self.options['use_greenlets'] = True
-            self._connection = MongoClient(host=url, **self.options)
+                conf['use_greenlets'] = True
+
+            self._connection = MongoClient(**conf)
 
         return self._connection
 

+ 57 - 0
celery/tests/backends/test_mongodb.py

@@ -67,6 +67,63 @@ class test_MongoBackend(AppCase):
         self.app.conf.CELERY_MONGODB_BACKEND_SETTINGS = None
         MongoBackend(app=self.app)
 
+    def test_init_with_settings(self):
+        self.app.conf.CELERY_MONGODB_BACKEND_SETTINGS = None
+        # empty settings
+        mb = MongoBackend(app=self.app)
+
+        # uri
+        uri = 'mongodb://localhost:27017'
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.mongo_host, ['localhost:27017'])
+        self.assertEqual(mb.options, {'auto_start_request': False,
+                                      'max_pool_size': 10})
+        self.assertEqual(mb.database_name, 'celery')
+
+        # uri with database name
+        uri = 'mongodb://localhost:27017/celerydb'
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.database_name, 'celerydb')
+
+        # uri with user, password, database name, replica set
+        uri = ('mongodb://'
+               'celeryuser:celerypassword@'
+               'mongo1.example.com:27017,'
+               'mongo2.example.com:27017,'
+               'mongo3.example.com:27017/'
+               'celerydatabase?replicaSet=rs0')
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.mongo_host, ['mongo1.example.com:27017',
+                                         'mongo2.example.com:27017',
+                                         'mongo3.example.com:27017'])
+        self.assertEqual(mb.options, {'auto_start_request': False,
+                                      'max_pool_size': 10,
+                                      'replicaset': 'rs0'})
+        self.assertEqual(mb.user, 'celeryuser')
+        self.assertEqual(mb.password, 'celerypassword')
+        self.assertEqual(mb.database_name, 'celerydatabase')
+
+        # same uri, change some parameters in backend settings
+        self.app.conf.CELERY_MONGODB_BACKEND_SETTINGS = {
+            'replicaset': 'rs1',
+            'user': 'backenduser',
+            'database': 'another_db',
+            'options': {
+                'socketKeepAlive': True,
+            },
+        }
+        mb = MongoBackend(app=self.app, url=uri)
+        self.assertEqual(mb.mongo_host, ['mongo1.example.com:27017',
+                                         'mongo2.example.com:27017',
+                                         'mongo3.example.com:27017'])
+        self.assertEqual(mb.options, {'auto_start_request': False,
+                                      'max_pool_size': 10,
+                                      'replicaset': 'rs1',
+                                      'socketKeepAlive': True})
+        self.assertEqual(mb.user, 'backenduser')
+        self.assertEqual(mb.password, 'celerypassword')
+        self.assertEqual(mb.database_name, 'another_db')
+
     @depends_on_current_app
     def test_reduce(self):
         x = MongoBackend(app=self.app)