瀏覽代碼

[Results] Adds new Backend.as_uri()

This can be used to get the URL used when configuring the backend,
and also supports an include_password argument that if set to False sanitizes
the URL for use in logs, etc.

The :program:`celery worker` startup banner is updated to use this
for sanitization.

Closes #3079
Closes #3045
Closes #3049
Closes #3068
Closes #3073
m-vdb 9 年之前
父節點
當前提交
f8bcdfed79

+ 1 - 4
celery/apps/worker.py

@@ -22,7 +22,6 @@ from functools import partial
 
 from billiard.process import current_process
 from kombu.utils.encoding import safe_str
-from kombu.utils.url import maybe_sanitize_url
 
 from celery import VERSION_BANNER, platforms, signals
 from celery.app import trace
@@ -206,9 +205,7 @@ class Worker(WorkController):
             timestamp=datetime.now().replace(microsecond=0),
             version=VERSION_BANNER,
             conninfo=self.app.connection().as_uri(),
-            results=maybe_sanitize_url(
-                self.app.conf.result_backend or 'disabled',
-            ),
+            results=self.app.backend.as_uri(),
             concurrency=concurrency,
             platform=safe_str(_platform.platform()),
             events=events,

+ 14 - 1
celery/backends/base.py

@@ -24,6 +24,7 @@ from kombu.serialization import (
     registry as serializer_registry,
 )
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
+from kombu.utils.url import maybe_sanitize_url
 
 from celery import states
 from celery import current_app, group, maybe_signature
@@ -93,7 +94,7 @@ class Backend(object):
 
     def __init__(self, app,
                  serializer=None, max_cached_results=None, accept=None,
-                 expires=None, expires_type=None, **kwargs):
+                 expires=None, expires_type=None, url=None, **kwargs):
         self.app = app
         conf = self.app.conf
         self.serializer = serializer or conf.result_serializer
@@ -108,6 +109,14 @@ class Backend(object):
             conf.accept_content if accept is None else accept,
         )
         self._pending_results = {}
+        self.url = url
+
+    def as_uri(self, include_password=False):
+        """Return the backend as an URI, sanitizing the password or not"""
+        # when using maybe_sanitize_url(), "/" is added
+        # we're stripping it for consistency
+        return (self.url if include_password
+                else maybe_sanitize_url(self.url).rstrip("/"))
 
     def mark_as_started(self, task_id, **meta):
         """Mark a task as started"""
@@ -682,5 +691,9 @@ class DisabledBackend(BaseBackend):
         raise NotImplementedError(
             'No result backend configured.  '
             'Please see the documentation for more information.')
+
+    def as_uri(self, *args, **kwargs):
+        return 'disabled://'
+
     get_state = get_status = get_result = get_traceback = _is_disabled
     wait_for = get_many = _is_disabled

+ 9 - 0
celery/backends/cache.py

@@ -149,3 +149,12 @@ class CacheBackend(KeyValueStoreBackend):
                  expires=self.expires,
                  options=self.options))
         return super(CacheBackend, self).__reduce__(args, kwargs)
+
+    def as_uri(self, *args, **kwargs):
+        """
+        Return the backend as an URI. It properly handles the
+        case of multiple servers. It doesn't try to sanitize
+        password because memcached URIs doesn't support them.
+        """
+        servers = ';'.join(self.servers)
+        return '{0}://{1}/'.format(self.backend, servers)

+ 16 - 3
celery/backends/mongodb.py

@@ -11,6 +11,7 @@ from __future__ import absolute_import
 from datetime import datetime, timedelta
 
 from kombu.utils import cached_property
+from kombu.utils.url import maybe_sanitize_url
 from kombu.exceptions import EncodeError
 from celery import states
 from celery.exceptions import ImproperlyConfigured
@@ -55,7 +56,7 @@ class MongoBackend(BaseBackend):
 
     _connection = None
 
-    def __init__(self, app=None, url=None, **kwargs):
+    def __init__(self, app=None, **kwargs):
         """Initialize MongoDB backend instance.
 
         :raises celery.exceptions.ImproperlyConfigured: if
@@ -71,8 +72,6 @@ class MongoBackend(BaseBackend):
                 'You need to install the pymongo library to use the '
                 'MongoDB backend.')
 
-        self.url = url
-
         # Set option defaults
         for key, value in items(self._prepare_client_options()):
             self.options.setdefault(key, value)
@@ -295,3 +294,17 @@ class MongoBackend(BaseBackend):
     @cached_property
     def expires_delta(self):
         return timedelta(seconds=self.expires)
+
+    def as_uri(self, include_password=False):
+        """
+        Return the backend as an URI, sanitizing the password or not.
+        It properly handles the case of a replica set.
+        """
+        if include_password:
+            return self.url
+
+        if "," not in self.url:
+            return maybe_sanitize_url(self.url).rstrip("/")
+
+        uri1, remainder = self.url.split(",", 1)
+        return ",".join([maybe_sanitize_url(uri1).rstrip("/"), remainder])

+ 18 - 0
celery/tests/backends/test_base.py

@@ -584,3 +584,21 @@ class test_DisabledBackend(AppCase):
     def test_is_disabled(self):
         with self.assertRaises(NotImplementedError):
             DisabledBackend(self.app).get_state('foo')
+
+    def test_as_uri(self):
+        self.assertEqual(DisabledBackend(self.app).as_uri(), 'disabled://')
+
+
+class test_as_uri(AppCase):
+
+    def setup(self):
+        self.b = BaseBackend(
+            app=self.app,
+            url="sch://uuuu:pwpw@hostname.dom"
+        )
+
+    def test_as_uri_include_password(self):
+        self.assertEqual(self.b.as_uri(True), "sch://uuuu:pwpw@hostname.dom")
+
+    def test_as_uri_exclude_password(self):
+        self.assertEqual(self.b.as_uri(), "sch://uuuu:**@hostname.dom")

+ 20 - 2
celery/tests/backends/test_cache.py

@@ -5,12 +5,12 @@ import types
 
 from contextlib import contextmanager
 
-from kombu.utils.encoding import str_to_bytes
+from kombu.utils.encoding import str_to_bytes, ensure_bytes
 
 from celery import signature
 from celery import states
 from celery import group
-from celery.backends.cache import CacheBackend, DummyClient
+from celery.backends.cache import CacheBackend, DummyClient, backends
 from celery.exceptions import ImproperlyConfigured
 from celery.five import items, string, text_t
 from celery.utils import uuid
@@ -34,6 +34,11 @@ class test_CacheBackend(AppCase):
         self.app.conf.result_serializer = 'pickle'
         self.tb = CacheBackend(backend='memory://', app=self.app)
         self.tid = uuid()
+        self.old_get_best_memcached = backends['memcache']
+        backends['memcache'] = lambda: (DummyClient, ensure_bytes)
+
+    def teardown(self):
+        backends['memcache'] = self.old_get_best_memcached
 
     def test_no_backend(self):
         self.app.conf.cache_backend = None
@@ -118,6 +123,19 @@ class test_CacheBackend(AppCase):
         with self.assertRaises(ImproperlyConfigured):
             CacheBackend(backend='unknown://', app=self.app)
 
+    def test_as_uri_no_servers(self):
+        self.assertEqual(self.tb.as_uri(), 'memory:///')
+
+    def test_as_uri_one_server(self):
+        backend = 'memcache://127.0.0.1:11211/'
+        b = CacheBackend(backend=backend, app=self.app)
+        self.assertEqual(b.as_uri(), backend)
+
+    def test_as_uri_multiple_servers(self):
+        backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
+        b = CacheBackend(backend=backend, app=self.app)
+        self.assertEqual(b.as_uri(), backend)
+
 
 class MyMemcachedStringEncodingError(Exception):
     pass

+ 20 - 1
celery/tests/backends/test_mongodb.py

@@ -31,6 +31,11 @@ MONGODB_GROUP_COLLECTION = 'group_collection1'
 
 class test_MongoBackend(AppCase):
 
+    default_url = "mongodb://uuuu:pwpw@hostname.dom/database"
+    replica_set_url = "mongodb://uuuu:pwpw@hostname.dom,hostname.dom/database?replicaSet=rs"
+    sanitized_default_url = default_url.replace("pwpw", "**")
+    sanitized_replica_set_url = replica_set_url.replace("pwpw", "**")
+
     def setup(self):
         if pymongo is None:
             raise SkipTest('pymongo is not installed.')
@@ -41,7 +46,7 @@ class test_MongoBackend(AppCase):
         R['Binary'], module.Binary = module.Binary, Mock()
         R['datetime'], datetime.datetime = datetime.datetime, Mock()
 
-        self.backend = MongoBackend(app=self.app)
+        self.backend = MongoBackend(app=self.app, url=self.default_url)
 
     def teardown(self):
         MongoBackend.encode = self._reset['encode']
@@ -385,6 +390,20 @@ class test_MongoBackend(AppCase):
                 'maxPoolSize': self.backend.max_pool_size
             })
 
+    def test_as_uri_include_password(self):
+        self.assertEqual(self.backend.as_uri(True), self.default_url)
+
+    def test_as_uri_exclude_password(self):
+        self.assertEqual(self.backend.as_uri(), self.sanitized_default_url)
+
+    def test_as_uri_include_password_replica_set(self):
+        backend = MongoBackend(app=self.app, url=self.replica_set_url)
+        self.assertEqual(backend.as_uri(True), self.replica_set_url)
+
+    def test_as_uri_exclude_password_replica_set(self):
+        backend = MongoBackend(app=self.app, url=self.replica_set_url)
+        self.assertEqual(backend.as_uri(), self.sanitized_replica_set_url)
+
 
 class test_MongoBackend_no_mock(AppCase):
 

+ 14 - 0
celery/tests/bin/test_worker.py

@@ -209,6 +209,20 @@ class test_Worker(WorkerAppCase):
         finally:
             cd.ARTLINES = prev
 
+    @disable_stdouts
+    def test_startup_info_mongo_result_backend(self):
+        self.app.conf.result_backend = "mongodb://user:password@host0.com:43437,host1.com:43437/work4us?replicaSet=rs&ssl=true"
+        worker = self.Worker(app=self.app)
+        worker.on_start()
+        self.assertTrue(worker.startup_info())
+
+    @disable_stdouts
+    def test_startup_info_memcached_result_backend(self):
+        self.app.conf.result_backend = "cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/"
+        worker = self.Worker(app=self.app)
+        worker.on_start()
+        self.assertTrue(worker.startup_info())
+
     @disable_stdouts
     def test_run(self):
         self.Worker(app=self.app).on_start()