Pārlūkot izejas kodu

All result backends must properly set self.url

Ask Solem 9 gadi atpakaļ
vecāks
revīzija
0989790b27

+ 3 - 0
celery/backends/amqp.py

@@ -249,6 +249,9 @@ class AMQPBackend(base.Backend, AsyncBackendMixin):
         raise NotImplementedError(
             'delete_group is not supported by this backend.')
 
+    def as_uri(self, include_password=True):
+        return 'amqp://'
+
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(
             connection=self._connection,

+ 1 - 0
celery/backends/cache.py

@@ -100,6 +100,7 @@ class CacheBackend(KeyValueStoreBackend):
     def __init__(self, app, expires=None, backend=None,
                  options={}, url=None, **kwargs):
         super(CacheBackend, self).__init__(app, **kwargs)
+        self.url = url
 
         self.options = dict(self.app.conf.cache_backend_options,
                             **options)

+ 3 - 0
celery/backends/cassandra.py

@@ -218,6 +218,9 @@ class CassandraBackend(BaseBackend):
             buf_t(self.encode(self.current_task_children(request)))
         ))
 
+    def as_uri(self, include_password=True):
+        return 'cassandra://'
+
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
         self._get_connection()

+ 7 - 7
celery/backends/couchbase.py

@@ -28,6 +28,12 @@ __all__ = ['CouchBaseBackend']
 
 
 class CouchBaseBackend(KeyValueStoreBackend):
+    """CouchBase backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`couchbase` is not available.
+
+    """
     bucket = 'default'
     host = 'localhost'
     port = 8091
@@ -38,19 +44,13 @@ class CouchBaseBackend(KeyValueStoreBackend):
     unlock_gil = True
     timeout = 2.5
     transcoder = None
-    # supports_autoexpire = False
 
     # Use str as couchbase key not bytes
     key_t = str_t
 
     def __init__(self, url=None, *args, **kwargs):
-        """Initialize CouchBase backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`couchbase` is not available.
-
-        """
         super(CouchBaseBackend, self).__init__(*args, **kwargs)
+        self.url = url
 
         if Couchbase is None:
             raise ImproperlyConfigured(

+ 7 - 6
celery/backends/couchdb.py

@@ -27,6 +27,12 @@ You need to install the pycouchdb library to use the CouchDB result backend\
 
 
 class CouchBackend(KeyValueStoreBackend):
+    """CouchDB backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`pycouchdb` is not available.
+
+    """
     container = 'default'
     scheme = 'http'
     host = 'localhost'
@@ -35,13 +41,8 @@ class CouchBackend(KeyValueStoreBackend):
     password = None
 
     def __init__(self, url=None, *args, **kwargs):
-        """Initialize CouchDB backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`pycouchdb` is not available.
-
-        """
         super(CouchBackend, self).__init__(*args, **kwargs)
+        self.url = url
 
         if pycouchdb is None:
             raise ImproperlyConfigured(ERR_LIB_MISSING)

+ 1 - 0
celery/backends/elasticsearch.py

@@ -45,6 +45,7 @@ class ElasticsearchBackend(KeyValueStoreBackend):
 
     def __init__(self, url=None, *args, **kwargs):
         super(ElasticsearchBackend, self).__init__(*args, **kwargs)
+        self.url = url
 
         if elasticsearch is None:
             raise ImproperlyConfigured(E_LIB_MISSING)

+ 11 - 11
celery/backends/filesystem.py

@@ -32,22 +32,22 @@ the correct permissions.\
 
 
 class FilesystemBackend(KeyValueStoreBackend):
+    """Filesystem result backend.
 
-    def __init__(self, url=None, open=open, unlink=os.unlink, sep=os.sep,
-                 encoding=default_encoding, *args, **kwargs):
-        """Initialize the filesystem backend.
-
-        Keyword arguments (in addition to those of KeyValueStoreBackend):
+    Keyword arguments (in addition to those of KeyValueStoreBackend):
 
-        :param url:  URL to the directory we should use
-        :param open: open function to use when opening files
-        :param unlink: unlink function to use when deleting files
-        :param sep: directory seperator (to join the directory with the key)
-        :param encoding: encoding used on the filesystem
+    :param url:  URL to the directory we should use
+    :param open: open function to use when opening files
+    :param unlink: unlink function to use when deleting files
+    :param sep: directory seperator (to join the directory with the key)
+    :param encoding: encoding used on the filesystem
 
-        """
+    """
 
+    def __init__(self, url=None, open=open, unlink=os.unlink, sep=os.sep,
+                 encoding=default_encoding, *args, **kwargs):
         super(FilesystemBackend, self).__init__(*args, **kwargs)
+        self.url = url
         path = self._find_path(url)
 
         # We need the path and seperator as bytes objects

+ 8 - 8
celery/backends/mongodb.py

@@ -40,6 +40,12 @@ __all__ = ['MongoBackend']
 
 
 class MongoBackend(BaseBackend):
+    """MongoDB result backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`pymongo` is not available.
+
+    """
 
     mongo_host = None
     host = 'localhost'
@@ -57,12 +63,6 @@ class MongoBackend(BaseBackend):
     _connection = None
 
     def __init__(self, app=None, **kwargs):
-        """Initialize MongoDB backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`pymongo` is not available.
-
-        """
         self.options = {}
 
         super(MongoBackend, self).__init__(app, **kwargs)
@@ -305,7 +305,7 @@ class MongoBackend(BaseBackend):
             return self.url
 
         if ',' not in self.url:
-            return maybe_sanitize_url(self.url).rstrip('/')
+            return maybe_sanitize_url(self.url)
 
         uri1, remainder = self.url.split(',', 1)
-        return ','.join([maybe_sanitize_url(uri1).rstrip('/'), remainder])
+        return ','.join([maybe_sanitize_url(uri1), remainder])

+ 7 - 5
celery/backends/riak.py

@@ -50,6 +50,12 @@ def is_ascii(s):
 
 
 class RiakBackend(KeyValueStoreBackend):
+    """Riak result backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`riak` is not available.
+
+    """
     # TODO: allow using other protocols than protobuf ?
     #: default protocol used to connect to Riak, might be `http` or `pbc`
     protocol = 'pbc'
@@ -67,12 +73,8 @@ class RiakBackend(KeyValueStoreBackend):
 
     def __init__(self, host=None, port=None, bucket_name=None, protocol=None,
                  url=None, *args, **kwargs):
-        """Initialize Riak backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`riak` is not available.
-        """
         super(RiakBackend, self).__init__(*args, **kwargs)
+        self.url = url
 
         if not riak:
             raise ImproperlyConfigured(

+ 3 - 0
celery/backends/rpc.py

@@ -57,6 +57,9 @@ class RPCBackend(amqp.AMQPBackend):
     def on_result_fulfilled(self, result):
         pass
 
+    def as_uri(self, include_password=True):
+        return 'rpc://'
+
     @property
     def binding(self):
         return self.Queue(self.oid, self.exchange, self.oid,

+ 5 - 2
celery/tests/backends/test_mongodb.py

@@ -36,8 +36,11 @@ class test_MongoBackend(AppCase):
         'mongodb://uuuu:pwpw@hostname.dom,'
         'hostname.dom/database?replicaSet=rs'
     )
-    sanitized_default_url = default_url.replace('pwpw', '**')
-    sanitized_replica_set_url = replica_set_url.replace('pwpw', '**')
+    sanitized_default_url = 'mongodb://uuuu:**@hostname.dom/database'
+    sanitized_replica_set_url = (
+        'mongodb://uuuu:**@hostname.dom/,'
+        'hostname.dom/database?replicaSet=rs'
+    )
 
     def setup(self):
         if pymongo is None: