Prechádzať zdrojové kódy

All result backends must properly set self.url

Ask Solem 9 rokov pred
rodič
commit
082dc1ccb8

+ 3 - 0
celery/backends/amqp.py

@@ -301,6 +301,9 @@ class AMQPBackend(BaseBackend):
         raise NotImplementedError(
             'delete_group is not supported by this backend.')
 
+    def as_uri(self, include_password=True):
+        return 'amqp://'
+
     def __reduce__(self, args=(), kwargs={}):
         kwargs.update(
             connection=self._connection,

+ 1 - 0
celery/backends/cache.py

@@ -100,6 +100,7 @@ class CacheBackend(KeyValueStoreBackend):
     def __init__(self, app, expires=None, backend=None,
                  options={}, url=None, **kwargs):
         super(CacheBackend, self).__init__(app, **kwargs)
+        self.url = url
 
         self.options = dict(self.app.conf.CELERY_CACHE_BACKEND_OPTIONS,
                             **options)

+ 3 - 0
celery/backends/cassandra.py

@@ -158,6 +158,9 @@ class CassandraBackend(BaseBackend):
 
         return self._retry_on_error(_do_store)
 
+    def as_uri(self, include_password=True):
+        return 'cassandra://'
+
     def _get_task_meta_for(self, task_id):
         """Get task metadata for a task by id."""
 

+ 7 - 7
celery/backends/couchbase.py

@@ -28,6 +28,12 @@ __all__ = ['CouchBaseBackend']
 
 
 class CouchBaseBackend(KeyValueStoreBackend):
+    """CouchBase backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`couchbase` is not available.
+
+    """
     bucket = 'default'
     host = 'localhost'
     port = 8091
@@ -38,16 +44,10 @@ class CouchBaseBackend(KeyValueStoreBackend):
     unlock_gil = True
     timeout = 2.5
     transcoder = None
-    # supports_autoexpire = False
 
     def __init__(self, url=None, *args, **kwargs):
-        """Initialize CouchBase backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`couchbase` is not available.
-
-        """
         super(CouchBaseBackend, self).__init__(*args, **kwargs)
+        self.url = url
 
         self.expires = kwargs.get('expires') or maybe_timedelta(
             self.app.conf.CELERY_TASK_RESULT_EXPIRES)

+ 9 - 8
celery/backends/mongodb.py

@@ -38,6 +38,13 @@ __all__ = ['MongoBackend']
 
 
 class MongoBackend(BaseBackend):
+    """MongoDB result backend.
+
+    :raises celery.exceptions.ImproperlyConfigured: if
+        module :mod:`pymongo` is not available.
+
+    """
+
     host = 'localhost'
     port = 27017
     user = None
@@ -52,12 +59,6 @@ class MongoBackend(BaseBackend):
     _connection = None
 
     def __init__(self, app=None, url=None, **kwargs):
-        """Initialize MongoDB backend instance.
-
-        :raises celery.exceptions.ImproperlyConfigured: if
-            module :mod:`pymongo` is not available.
-
-        """
         self.options = {}
         super(MongoBackend, self).__init__(app, **kwargs)
         self.expires = kwargs.get('expires') or maybe_timedelta(
@@ -255,7 +256,7 @@ class MongoBackend(BaseBackend):
             return self.url
 
         if ',' not in self.url:
-            return maybe_sanitize_url(self.url).rstrip('/')
+            return maybe_sanitize_url(self.url)
 
         uri1, remainder = self.url.split(',', 1)
-        return ','.join([maybe_sanitize_url(uri1).rstrip('/'), remainder])
+        return ','.join([maybe_sanitize_url(uri1), remainder])

+ 3 - 0
celery/backends/rpc.py

@@ -54,6 +54,9 @@ class RPCBackend(amqp.AMQPBackend):
     def on_reply_declare(self, task_id):
         pass
 
+    def as_uri(self, include_password=True):
+        return 'rpc://'
+
     @property
     def binding(self):
         return self.Queue(self.oid, self.exchange, self.oid,

+ 5 - 2
celery/tests/backends/test_mongodb.py

@@ -31,8 +31,11 @@ class test_MongoBackend(AppCase):
         'mongodb://uuuu:pwpw@hostname.dom,'
         'hostname.dom/database?replicaSet=rs'
     )
-    sanitized_default_url = default_url.replace('pwpw', '**')
-    sanitized_replica_set_url = replica_set_url.replace('pwpw', '**')
+    sanitized_default_url = 'mongodb://uuuu:**@hostname.dom/database'
+    sanitized_replica_set_url = (
+        'mongodb://uuuu:**@hostname.dom/,'
+        'hostname.dom/database?replicaSet=rs'
+    )
 
     def setup(self):
         if pymongo is None: