|
@@ -23,6 +23,7 @@ if pymongo:
|
|
|
else: # pragma: no cover
|
|
|
Binary = None # noqa
|
|
|
|
|
|
+from kombu.syn import detect_environment
|
|
|
from kombu.utils import cached_property
|
|
|
|
|
|
from celery import states
|
|
@@ -42,14 +43,14 @@ class Bunch(object):
|
|
|
|
|
|
|
|
|
class MongoBackend(BaseBackend):
|
|
|
- mongodb_host = 'localhost'
|
|
|
- mongodb_port = 27017
|
|
|
- mongodb_user = None
|
|
|
- mongodb_password = None
|
|
|
- mongodb_database = 'celery'
|
|
|
- mongodb_taskmeta_collection = 'celery_taskmeta'
|
|
|
- mongodb_max_pool_size = 10
|
|
|
- mongodb_options = None
|
|
|
+ hostname = 'localhost'
|
|
|
+ port = 27017
|
|
|
+ user = None
|
|
|
+ password = None
|
|
|
+ database = 'celery'
|
|
|
+ taskmeta_collection = 'celery_taskmeta'
|
|
|
+ max_pool_size = 10
|
|
|
+ options = None
|
|
|
|
|
|
supports_autoexpire = False
|
|
|
|
|
@@ -69,29 +70,32 @@ class MongoBackend(BaseBackend):
|
|
|
'You need to install the pymongo library to use the '
|
|
|
'MongoDB backend.')
|
|
|
|
|
|
- config = self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS', None)
|
|
|
- if config is not None:
|
|
|
+ config = dict(self.app.conf.get('CELERY_MONGODB_BACKEND_SETTINGS', {}))
|
|
|
+ if config:
|
|
|
if not isinstance(config, dict):
|
|
|
raise ImproperlyConfigured(
|
|
|
'MongoDB backend settings should be grouped in a dict')
|
|
|
|
|
|
- self.mongodb_host = config.get('host', self.mongodb_host)
|
|
|
- self.mongodb_port = int(config.get('port', self.mongodb_port))
|
|
|
- self.mongodb_user = config.get('user', self.mongodb_user)
|
|
|
- self.mongodb_options = config.get('options', {})
|
|
|
- self.mongodb_password = config.get(
|
|
|
- 'password', self.mongodb_password)
|
|
|
- self.mongodb_database = config.get(
|
|
|
- 'database', self.mongodb_database)
|
|
|
- self.mongodb_taskmeta_collection = config.get(
|
|
|
- 'taskmeta_collection', self.mongodb_taskmeta_collection)
|
|
|
- self.mongodb_max_pool_size = config.get(
|
|
|
- 'max_pool_size', self.mongodb_max_pool_size)
|
|
|
+ self.hostname = config.pop('host', self.hostname)
|
|
|
+ self.port = int(config.pop('port', self.port))
|
|
|
+ self.user = config.get('user', self.user)
|
|
|
+ self.password = config.pop('password', self.password)
|
|
|
+ self.database = config.pop('database', self.database)
|
|
|
+ self.taskmeta_collection = config.pop(
|
|
|
+ 'taskmeta_collection', self.taskmeta_collection,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.options = dict(config, **config.pop('options', {}))
|
|
|
+
|
|
|
+ # Set option defaults
|
|
|
+ self.options.setdefault('ssl', self.app.conf.BROKER_USE_SSL)
|
|
|
+ self.options.setdefault('max_pool_size', self.max_pool_size)
|
|
|
+ self.options.setdefault('auto_start_request', False)
|
|
|
+
|
|
|
url = kwargs.get('url')
|
|
|
if url:
|
|
|
# Specifying backend as an URL
|
|
|
- self.mongodb_host = url
|
|
|
-
|
|
|
+ self.hostname = url
|
|
|
self._connection = None
|
|
|
|
|
|
def _get_connection(self):
|
|
@@ -101,23 +105,19 @@ class MongoBackend(BaseBackend):
|
|
|
|
|
|
# The first pymongo.Connection() argument (host) can be
|
|
|
# a list of ['host:port'] elements or a mongodb connection
|
|
|
- # URI. If this is the case, don't use self.mongodb_port
|
|
|
+ # URI. If this is the case, don't use self.port
|
|
|
# but let pymongo get the port(s) from the URI instead.
|
|
|
# This enables the use of replica sets and sharding.
|
|
|
# See pymongo.Connection() for more info.
|
|
|
- url = self.mongodb_host
|
|
|
- kwargs = {
|
|
|
- 'max_pool_size': self.mongodb_max_pool_size,
|
|
|
- 'ssl': self.app.conf.BROKER_USE_SSL,
|
|
|
- 'auto_start_request': False,
|
|
|
- }
|
|
|
+ url = self.hostname
|
|
|
if isinstance(url, string_t) \
|
|
|
and not url.startswith('mongodb://'):
|
|
|
- url = 'mongodb://{0}:{1}'.format(url, self.mongodb_port)
|
|
|
+ url = 'mongodb://{0}:{1}'.format(url, self.port)
|
|
|
if url == 'mongodb://':
|
|
|
url = url + 'localhost'
|
|
|
self._connection = MongoClient(
|
|
|
- host=url, **dict(kwargs, **self.mongodb_options or {})
|
|
|
+ host=url, use_greenlet=detect_environment() != 'default',
|
|
|
+ **self.options
|
|
|
)
|
|
|
|
|
|
return self._connection
|
|
@@ -215,10 +215,10 @@ class MongoBackend(BaseBackend):
|
|
|
|
|
|
def _get_database(self):
|
|
|
conn = self._get_connection()
|
|
|
- db = conn[self.mongodb_database]
|
|
|
- if self.mongodb_user and self.mongodb_password:
|
|
|
- if not db.authenticate(self.mongodb_user,
|
|
|
- self.mongodb_password):
|
|
|
+ db = conn[self.database]
|
|
|
+ if self.user and self.password:
|
|
|
+ if not db.authenticate(self.user,
|
|
|
+ self.password):
|
|
|
raise ImproperlyConfigured(
|
|
|
'Invalid MongoDB username or password.')
|
|
|
return db
|
|
@@ -232,7 +232,7 @@ class MongoBackend(BaseBackend):
|
|
|
@cached_property
|
|
|
def collection(self):
|
|
|
"""Get the metadata task collection."""
|
|
|
- collection = self.database[self.mongodb_taskmeta_collection]
|
|
|
+ collection = self.database[self.taskmeta_collection]
|
|
|
|
|
|
# Ensure an index on date_done is there, if not process the index
|
|
|
# in the background. Once completed cleanup will be much faster
|