Просмотр исходного кода

[Results] Adds new Backend.as_uri()

This can be used to get the URL used when configuring the backend,
and also supports an include_password argument that if set to False sanitizes
the URL for use in logs, etc.

The :program:`celery worker` startup banner is updated to use this
for sanitization.

Closes #3079
Closes #3045
Closes #3049
Closes #3068
Closes #3073
m-vdb 9 лет назад
Родитель
Сommit
e3416da90d

+ 1 - 4
celery/apps/worker.py

@@ -22,7 +22,6 @@ from functools import partial
 
 from billiard import current_process
 from kombu.utils.encoding import safe_str
-from kombu.utils.url import maybe_sanitize_url
 
 from celery import VERSION_BANNER, platforms, signals
 from celery.app import trace
@@ -228,9 +227,7 @@ class Worker(WorkController):
             hostname=safe_str(self.hostname),
             version=VERSION_BANNER,
             conninfo=self.app.connection().as_uri(),
-            results=maybe_sanitize_url(
-                self.app.conf.CELERY_RESULT_BACKEND or 'disabled',
-            ),
+            results=self.app.backend.as_uri(),
             concurrency=concurrency,
             platform=safe_str(_platform.platform()),
             events=events,

+ 18 - 3
celery/backends/base.py

@@ -24,6 +24,7 @@ from kombu.serialization import (
     registry as serializer_registry,
 )
 from kombu.utils.encoding import bytes_to_str, ensure_bytes, from_utf8
+from kombu.utils.url import maybe_sanitize_url
 
 from celery import states
 from celery import current_app, maybe_signature
@@ -92,8 +93,9 @@ class BaseBackend(object):
         'interval_max': 1,
     }
 
-    def __init__(self, app, serializer=None,
-                 max_cached_results=None, accept=None, **kwargs):
+    def __init__(self, app,
+                 serializer=None, max_cached_results=None, accept=None,
+                 url=None, **kwargs):
         self.app = app
         conf = self.app.conf
         self.serializer = serializer or conf.CELERY_RESULT_SERIALIZER
@@ -105,6 +107,14 @@ class BaseBackend(object):
         self.accept = prepare_accept_content(
             conf.CELERY_ACCEPT_CONTENT if accept is None else accept,
         )
+        self.url = url
+
+    def as_uri(self, include_password=False):
+        """Return the backend as an URI, sanitizing the password or not"""
+        # when using maybe_sanitize_url(), "/" is added
+        # we're stripping it for consistency
+        return (self.url if include_password
+                else maybe_sanitize_url(self.url).rstrip("/"))
 
     def mark_as_started(self, task_id, **meta):
         """Mark a task as started"""
@@ -603,4 +613,9 @@ class DisabledBackend(BaseBackend):
         raise NotImplementedError(
             'No result backend configured.  '
             'Please see the documentation for more information.')
-    wait_for = get_status = get_result = get_traceback = _is_disabled
+
+    def as_uri(self, *args, **kwargs):
+        return 'disabled://'
+
+    get_state = get_status = get_result = get_traceback = _is_disabled
+    wait_for = get_many = _is_disabled

+ 9 - 0
celery/backends/cache.py

@@ -149,3 +149,12 @@ class CacheBackend(KeyValueStoreBackend):
                  expires=self.expires,
                  options=self.options))
         return super(CacheBackend, self).__reduce__(args, kwargs)
+
+    def as_uri(self, *args, **kwargs):
+        """
+        Return the backend as an URI. It properly handles the
+        case of multiple servers. It doesn't try to sanitize
+        password because memcached URIs doesn't support them.
+        """
+        servers = ';'.join(self.servers)
+        return '{0}://{1}/'.format(self.backend, servers)

+ 15 - 0
celery/backends/mongodb.py

@@ -12,6 +12,7 @@ from datetime import datetime
 
 from kombu.syn import detect_environment
 from kombu.utils import cached_property
+from kombu.utils.url import maybe_sanitize_url
 
 from celery import states
 from celery.exceptions import ImproperlyConfigured
@@ -243,3 +244,17 @@ class MongoBackend(BaseBackend):
         # in the background. Once completed cleanup will be much faster
         collection.ensure_index('date_done', background='true')
         return collection
+
+    def as_uri(self, include_password=False):
+        """
+        Return the backend as an URI, sanitizing the password or not.
+        It properly handles the case of a replica set.
+        """
+        if include_password:
+            return self.url
+
+        if "," not in self.url:
+            return maybe_sanitize_url(self.url).rstrip("/")
+
+        uri1, remainder = self.url.split(",", 1)
+        return ",".join([maybe_sanitize_url(uri1).rstrip("/"), remainder])

+ 18 - 0
celery/tests/backends/test_base.py

@@ -446,3 +446,21 @@ class test_DisabledBackend(AppCase):
     def test_is_disabled(self):
         with self.assertRaises(NotImplementedError):
             DisabledBackend(self.app).get_status('foo')
+
+    def test_as_uri(self):
+        self.assertEqual(DisabledBackend(self.app).as_uri(), 'disabled://')
+
+
+class test_as_uri(AppCase):
+
+    def setup(self):
+        self.b = BaseBackend(
+            app=self.app,
+            url="sch://uuuu:pwpw@hostname.dom"
+        )
+
+    def test_as_uri_include_password(self):
+        self.assertEqual(self.b.as_uri(True), "sch://uuuu:pwpw@hostname.dom")
+
+    def test_as_uri_exclude_password(self):
+        self.assertEqual(self.b.as_uri(), "sch://uuuu:**@hostname.dom")

+ 20 - 2
celery/tests/backends/test_cache.py

@@ -5,12 +5,12 @@ import types
 
 from contextlib import contextmanager
 
-from kombu.utils.encoding import str_to_bytes
+from kombu.utils.encoding import str_to_bytes, ensure_bytes
 
 from celery import signature
 from celery import states
 from celery import group
-from celery.backends.cache import CacheBackend, DummyClient
+from celery.backends.cache import CacheBackend, DummyClient, backends
 from celery.exceptions import ImproperlyConfigured
 from celery.five import items, string, text_t
 from celery.utils import uuid
@@ -33,6 +33,11 @@ class test_CacheBackend(AppCase):
     def setup(self):
         self.tb = CacheBackend(backend='memory://', app=self.app)
         self.tid = uuid()
+        self.old_get_best_memcached = backends['memcache']
+        backends['memcache'] = lambda: (DummyClient, ensure_bytes)
+
+    def teardown(self):
+        backends['memcache'] = self.old_get_best_memcached
 
     def test_no_backend(self):
         self.app.conf.CELERY_CACHE_BACKEND = None
@@ -117,6 +122,19 @@ class test_CacheBackend(AppCase):
         with self.assertRaises(ImproperlyConfigured):
             CacheBackend(backend='unknown://', app=self.app)
 
+    def test_as_uri_no_servers(self):
+        self.assertEqual(self.tb.as_uri(), 'memory:///')
+
+    def test_as_uri_one_server(self):
+        backend = 'memcache://127.0.0.1:11211/'
+        b = CacheBackend(backend=backend, app=self.app)
+        self.assertEqual(b.as_uri(), backend)
+
+    def test_as_uri_multiple_servers(self):
+        backend = 'memcache://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/'
+        b = CacheBackend(backend=backend, app=self.app)
+        self.assertEqual(b.as_uri(), backend)
+
 
 class MyMemcachedStringEncodingError(Exception):
     pass

+ 17 - 27
celery/tests/backends/test_mongodb.py

@@ -26,6 +26,11 @@ MONGODB_COLLECTION = 'collection1'
 
 class test_MongoBackend(AppCase):
 
+    default_url = "mongodb://uuuu:pwpw@hostname.dom/database"
+    replica_set_url = "mongodb://uuuu:pwpw@hostname.dom,hostname.dom/database?replicaSet=rs"
+    sanitized_default_url = default_url.replace("pwpw", "**")
+    sanitized_replica_set_url = replica_set_url.replace("pwpw", "**")
+
     def setup(self):
         if pymongo is None:
             raise SkipTest('pymongo is not installed.')
@@ -36,7 +41,7 @@ class test_MongoBackend(AppCase):
         R['Binary'], module.Binary = module.Binary, Mock()
         R['datetime'], datetime.datetime = datetime.datetime, Mock()
 
-        self.backend = MongoBackend(app=self.app)
+        self.backend = MongoBackend(app=self.app, url=self.default_url)
 
     def teardown(self):
         MongoBackend.encode = self._reset['encode']
@@ -330,31 +335,16 @@ class test_MongoBackend(AppCase):
                 'auto_start_request': False
             })
 
-    @patch('celery.backends.mongodb.detect_environment')
-    def test_prepare_client_options_for_ver_2_with_gevent(self, m_detect_env):
-        m_detect_env.return_value = 'gevent'
-        with patch('pymongo.version_tuple', new=(2, 6, 3)):
-            options = self.backend._prepare_client_options()
-            self.assertDictEqual(options, {
-                'max_pool_size': self.backend.max_pool_size,
-                'auto_start_request': False,
-                'use_greenlets': True
-            })
+    def test_as_uri_include_password(self):
+        self.assertEqual(self.backend.as_uri(True), self.default_url)
 
-    @patch('celery.backends.mongodb.detect_environment')
-    def test_prepare_client_options_for_ver_3(self, m_detect_env):
-        m_detect_env.return_value = 'default'
-        with patch('pymongo.version_tuple', new=(3, 0, 3)):
-            options = self.backend._prepare_client_options()
-            self.assertDictEqual(options, {
-                'maxPoolSize': self.backend.max_pool_size
-            })
+    def test_as_uri_exclude_password(self):
+        self.assertEqual(self.backend.as_uri(), self.sanitized_default_url)
 
-    @patch('celery.backends.mongodb.detect_environment')
-    def test_prepare_client_options_for_ver_3_with_gevent(self, m_detect_env):
-        m_detect_env.return_value = 'gevent'
-        with patch('pymongo.version_tuple', new=(3, 0, 3)):
-            options = self.backend._prepare_client_options()
-            self.assertDictEqual(options, {
-                'maxPoolSize': self.backend.max_pool_size
-            })
+    def test_as_uri_include_password_replica_set(self):
+        backend = MongoBackend(app=self.app, url=self.replica_set_url)
+        self.assertEqual(backend.as_uri(True), self.replica_set_url)
+
+    def test_as_uri_exclude_password_replica_set(self):
+        backend = MongoBackend(app=self.app, url=self.replica_set_url)
+        self.assertEqual(backend.as_uri(), self.sanitized_replica_set_url)

+ 14 - 0
celery/tests/bin/test_worker.py

@@ -211,6 +211,20 @@ class test_Worker(WorkerAppCase):
         finally:
             cd.ARTLINES = prev
 
+    @disable_stdouts
+    def test_startup_info_mongo_result_backend(self):
+        self.app.conf.result_backend = "mongodb://user:password@host0.com:43437,host1.com:43437/work4us?replicaSet=rs&ssl=true"
+        worker = self.Worker(app=self.app)
+        worker.on_start()
+        self.assertTrue(worker.startup_info())
+
+    @disable_stdouts
+    def test_startup_info_memcached_result_backend(self):
+        self.app.conf.result_backend = "cache+memcached://127.0.0.1:11211;127.0.0.2:11211;127.0.0.3/"
+        worker = self.Worker(app=self.app)
+        worker.on_start()
+        self.assertTrue(worker.startup_info())
+
     @disable_stdouts
     def test_run(self):
         self.Worker(app=self.app).on_start()