Browse Source

Un-revert Azure Block Blob backend (#5025)

* Revert "Revert "Add Azure Block Blob Storage backend (#4685)" (#5015)"

This reverts commit 8e6b2bf8df20060a28b17c2dd286f4aef566ca66.

* Remove custom docker install on Travis

Docker now has first class support on Travis so the custom apt-get
install instructions are no longer required and make the build fail.
Clemens Wolff 6 years ago
parent
commit
ced86ea588

+ 8 - 11
.travis.yml

@@ -20,6 +20,7 @@ env:
   - MATRIX_TOXENV=integration-rabbitmq
   - MATRIX_TOXENV=integration-redis
   - MATRIX_TOXENV=integration-dynamodb
+  - MATRIX_TOXENV=integration-azureblockblob
 matrix:
   include:
   - python: '3.6'
@@ -68,18 +69,13 @@ before_install:
           fi
     - |
           if [[ "$TOXENV" == *dynamodb ]]; then
-              sudo apt-get update && sudo apt-get install -y default-jre supervisor
-              mkdir /opt/dynamodb-local
-              cd /opt/dynamodb-local && curl --retry 5 --retry-delay 1 -L http://dynamodb-local.s3-website-us-west-2.amazonaws.com/dynamodb_local_latest.tar.gz | tar zx
-              cd -
-              echo '[program:dynamodb-local]' | sudo tee /etc/supervisor/conf.d/dynamodb-local.conf
-              echo 'command=java -Djava.library.path=./DynamoDBLocal_lib -jar DynamoDBLocal.jar -inMemory' | sudo tee -a /etc/supervisor/conf.d/dynamodb-local.conf
-              echo 'directory=/opt/dynamodb-local' | sudo tee -a /etc/supervisor/conf.d/dynamodb-local.conf
-              sudo service supervisor stop
-              sudo service supervisor start
-              sleep 10
-              curl localhost:8000
+              docker run -d -p 8000:8000 dwmkerr/dynamodb:38 -inMemory
+              while ! nc -zv 127.0.0.1 8000; do sleep 10; done
           fi
+    - |
+          docker run -d -e executable=blob -t -p 10000:10000 --tmpfs /opt/azurite/folder:rw arafato/azurite:2.6.5
+          while ! nc -zv 127.0.0.1 10000; do sleep 10; done
+          export AZUREBLOCKBLOB_URL="azureblockblob://DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
     - |
           wget -qO - https://packages.couchbase.com/ubuntu/couchbase.key | sudo apt-key add -
           sudo apt-add-repository -y 'deb http://packages.couchbase.com/ubuntu trusty trusty/main'
@@ -101,3 +97,4 @@ notifications:
 services:
     - rabbitmq
     - redis
+    - docker

+ 3 - 0
README.rst

@@ -292,6 +292,9 @@ Transports and Backends
 :``celery[cassandra]``:
     for using Apache Cassandra as a result backend with DataStax driver.
 
+:``celery[azureblockblob]``:
+    for using Azure Storage as a result backend (using ``azure-storage``)
+
 :``celery[couchbase]``:
     for using Couchbase as a result backend.
 

+ 1 - 0
celery/app/backends.py

@@ -35,6 +35,7 @@ BACKEND_ALIASES = {
     'disabled': 'celery.backends.base:DisabledBackend',
     'consul': 'celery.backends.consul:ConsulBackend',
     'dynamodb': 'celery.backends.dynamodb:DynamoDBBackend',
+    'azureblockblob': 'celery.backends.azureblockblob:AzureBlockBlobBackend',
 }
 
 

+ 6 - 0
celery/app/defaults.py

@@ -130,6 +130,12 @@ NAMESPACES = Namespace(
         auth_kwargs=Option(type='string'),
         options=Option({}, type='dict'),
     ),
+    azureblockblob=Namespace(
+        container_name=Option('celery', type='string'),
+        retry_initial_backoff_sec=Option(2, type='int'),
+        retry_increment_base=Option(2, type='int'),
+        retry_max_attempts=Option(3, type='int'),
+    ),
     control=Namespace(
         queue_ttl=Option(300.0, type='float'),
         queue_expires=Option(10.0, type='float'),

+ 148 - 0
celery/backends/azureblockblob.py

@@ -0,0 +1,148 @@
+"""The Azure Storage Block Blob backend for Celery."""
+from __future__ import absolute_import, unicode_literals
+
+from kombu.utils import cached_property
+from kombu.utils.encoding import bytes_to_str
+
+from celery.exceptions import ImproperlyConfigured
+from celery.utils.log import get_logger
+
+from .base import KeyValueStoreBackend
+
+try:
+    import azure.storage as azurestorage
+    from azure.common import AzureMissingResourceHttpError
+    from azure.storage.blob import BlockBlobService
+    from azure.storage.common.retry import ExponentialRetry
+except ImportError:  # pragma: no cover
+    azurestorage = BlockBlobService = ExponentialRetry = \
+        AzureMissingResourceHttpError = None  # noqa
+
+__all__ = ("AzureBlockBlobBackend",)
+
+LOGGER = get_logger(__name__)
+
+
+class AzureBlockBlobBackend(KeyValueStoreBackend):
+    """Azure Storage Block Blob backend for Celery."""
+
+    def __init__(self,
+                 url=None,
+                 container_name=None,
+                 retry_initial_backoff_sec=None,
+                 retry_increment_base=None,
+                 retry_max_attempts=None,
+                 *args,
+                 **kwargs):
+        super(AzureBlockBlobBackend, self).__init__(*args, **kwargs)
+
+        if azurestorage is None:
+            raise ImproperlyConfigured(
+                "You need to install the azure-storage library to use the "
+                "AzureBlockBlob backend")
+
+        conf = self.app.conf
+
+        self._connection_string = self._parse_url(url)
+
+        self._container_name = (
+            container_name or
+            conf["azureblockblob_container_name"])
+
+        self._retry_initial_backoff_sec = (
+            retry_initial_backoff_sec or
+            conf["azureblockblob_retry_initial_backoff_sec"])
+
+        self._retry_increment_base = (
+            retry_increment_base or
+            conf["azureblockblob_retry_increment_base"])
+
+        self._retry_max_attempts = (
+            retry_max_attempts or
+            conf["azureblockblob_retry_max_attempts"])
+
+    @classmethod
+    def _parse_url(cls, url, prefix="azureblockblob://"):
+        connection_string = url[len(prefix):]
+        if not connection_string:
+            raise ImproperlyConfigured("Invalid URL")
+
+        return connection_string
+
+    @cached_property
+    def _client(self):
+        """Return the Azure Storage Block Blob service.
+
+        If this is the first call to the property, the client is created and
+        the container is created if it doesn't yet exist.
+
+        """
+        client = BlockBlobService(connection_string=self._connection_string)
+
+        created = client.create_container(
+            container_name=self._container_name, fail_on_exist=False)
+
+        if created:
+            LOGGER.info("Created Azure Blob Storage container %s",
+                        self._container_name)
+
+        client.retry = ExponentialRetry(
+            initial_backoff=self._retry_initial_backoff_sec,
+            increment_base=self._retry_increment_base,
+            max_attempts=self._retry_max_attempts).retry
+
+        return client
+
+    def get(self, key):
+        """Read the value stored at the given key.
+
+        Args:
+              key: The key for which to read the value.
+
+        """
+        key = bytes_to_str(key)
+        LOGGER.debug("Getting Azure Block Blob %s/%s",
+                     self._container_name, key)
+
+        try:
+            return self._client.get_blob_to_text(
+                self._container_name, key).content
+        except AzureMissingResourceHttpError:
+            return None
+
+    def set(self, key, value):
+        """Store a value for a given key.
+
+        Args:
+              key: The key at which to store the value.
+              value: The value to store.
+
+        """
+        key = bytes_to_str(key)
+        LOGGER.debug("Creating Azure Block Blob at %s/%s",
+                     self._container_name, key)
+
+        return self._client.create_blob_from_text(
+            self._container_name, key, value)
+
+    def mget(self, keys):
+        """Read all the values for the provided keys.
+
+        Args:
+              keys: The list of keys to read.
+
+        """
+        return [self.get(key) for key in keys]
+
+    def delete(self, key):
+        """Delete the value at a given key.
+
+        Args:
+              key: The key of the value to delete.
+
+        """
+        key = bytes_to_str(key)
+        LOGGER.debug("Deleting Azure Block Blob at %s/%s",
+                     self._container_name, key)
+
+        self._client.delete_blob(self._container_name, key)

+ 5 - 0
docker/docker-compose.yml

@@ -14,6 +14,7 @@ services:
       PYTHONDONTWRITEBYTECODE: 1
       REDIS_HOST: redis
       WORKER_LOGLEVEL: DEBUG
+      AZUREBLOCKBLOB_URL: azureblockblob://DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://azurite:10000/devstoreaccount1;
     tty: true
     volumes:
       - ../docs:/home/developer/docs
@@ -23,6 +24,7 @@ services:
       - rabbit
       - redis
       - dynamodb
+      - azurite
 
   rabbit:
     image: rabbitmq:3.7.3
@@ -32,3 +34,6 @@ services:
 
   dynamodb:
     image: dwmkerr/dynamodb:38
+
+  azurite:
+    image: arafato/azurite:2.6.5

+ 11 - 0
docs/internals/reference/celery.backends.azureblockblob.rst

@@ -0,0 +1,11 @@
+================================================
+ ``celery.backends.azureblockblob``
+================================================
+
+.. contents::
+    :local:
+.. currentmodule:: celery.backends.azureblockblob
+
+.. automodule:: celery.backends.azureblockblob
+    :members:
+    :undoc-members:

+ 59 - 0
docs/userguide/configuration.rst

@@ -584,6 +584,10 @@ Can be one of the following:
     Use the `Consul`_ K/V store to store the results
     See :ref:`conf-consul-result-backend`.
 
+* ``azureblockblob``
+    Use the `AzureBlockBlob`_ PaaS store to store the results
+    See :ref:`conf-azureblockblob-result-backend`.
+
 .. warning:
 
     While the AMQP result backend is very efficient, you must make sure
@@ -598,6 +602,7 @@ Can be one of the following:
 .. _`CouchDB`: http://www.couchdb.com/
 .. _`Couchbase`: https://www.couchbase.com/
 .. _`Consul`: https://consul.io/
+.. _`AzureBlockBlob`: https://azure.microsoft.com/en-us/services/storage/blobs/
 
 
 .. setting:: result_backend_transport_options
@@ -1122,6 +1127,60 @@ Example configuration
     cassandra_write_consistency = 'ONE'
     cassandra_entry_ttl = 86400
 
+.. _conf-azureblockblob-result-backend:
+
+Azure Block Blob backend settings
+---------------------------------
+
+To use `AzureBlockBlob`_ as the result backend you simply need to
+configure the :setting:`result_backend` setting with the correct URL.
+
+The required URL format is ``azureblockblob://`` followed by the storage
+connection string. You can find the storage connection string in the
+``Access Keys`` pane of your storage account resource in the Azure Portal.
+
+Example configuration
+~~~~~~~~~~~~~~~~~~~~~
+
+.. code-block:: python
+
+    result_backend = 'azureblockblob://DefaultEndpointsProtocol=https;AccountName=somename;AccountKey=Lou...bzg==;EndpointSuffix=core.windows.net'
+
+.. setting:: azureblockblob_container_name
+
+``azureblockblob_container_name``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Default: celery.
+
+The name for the storage container in which to store the results.
+
+.. setting:: azureblockblob_retry_initial_backoff_sec
+
+``azureblockblob_retry_initial_backoff_sec``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Default: 2.
+
+The initial backoff interval, in seconds, for the first retry.
+Subsequent retries are attempted with an exponential strategy.
+
+.. setting:: azureblockblob_retry_increment_base
+
+``azureblockblob_retry_increment_base``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Default: 2.
+
+.. setting:: azureblockblob_retry_max_attempts
+
+``azureblockblob_retry_max_attempts``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Default: 3.
+
+The maximum number of retry attempts.
+
 .. _conf-elasticsearch-result-backend:
 
 Elasticsearch backend settings

+ 3 - 0
requirements/extras/azureblockblob.txt

@@ -0,0 +1,3 @@
+azure-storage==0.36.0
+azure-common==1.1.5
+azure-storage-common==1.1.0

+ 1 - 0
requirements/test-ci-default.txt

@@ -18,3 +18,4 @@
 -r extras/consul.txt
 -r extras/cassandra.txt
 -r extras/dynamodb.txt
+-r extras/azureblockblob.txt

+ 1 - 0
requirements/test-integration.txt

@@ -1,3 +1,4 @@
 simplejson
 -r extras/redis.txt
 -r extras/dynamodb.txt
+-r extras/azureblockblob.txt

+ 1 - 0
setup.py

@@ -68,6 +68,7 @@ EXTENSIONS = {
     'zookeeper',
     'solar',
     'sqlalchemy',
+    'azureblockblob',
     'librabbitmq',
     'pyro',
     'slmq',

+ 37 - 0
t/integration/test_backend.py

@@ -0,0 +1,37 @@
+from __future__ import absolute_import, unicode_literals
+
+import os
+
+from case import skip
+
+from celery.backends.azureblockblob import AzureBlockBlobBackend
+
+
+@skip.unless_module("azure")
+@skip.unless_environ("AZUREBLOCKBLOB_URL")
+class test_AzureBlockBlobBackend:
+    def test_crud(self, manager):
+        backend = AzureBlockBlobBackend(
+            app=manager.app,
+            url=os.environ["AZUREBLOCKBLOB_URL"])
+
+        key_values = {("akey%d" % i).encode(): "avalue%d" % i
+                      for i in range(5)}
+
+        for key, value in key_values.items():
+            backend.set(key, value)
+
+        actual_values = backend.mget(key_values.keys())
+        expected_values = list(key_values.values())
+
+        assert expected_values == actual_values
+
+        for key in key_values:
+            backend.delete(key)
+
+    def test_get_missing(self, manager):
+        backend = AzureBlockBlobBackend(
+            app=manager.app,
+            url=os.environ["AZUREBLOCKBLOB_URL"])
+
+        assert backend.get(b"doesNotExist") is None

+ 94 - 0
t/unit/backends/test_azureblockblob.py

@@ -0,0 +1,94 @@
+from __future__ import absolute_import, unicode_literals
+
+import pytest
+from case import Mock, call, patch, skip
+
+from celery.backends import azureblockblob
+from celery.backends.azureblockblob import AzureBlockBlobBackend
+from celery.exceptions import ImproperlyConfigured
+
+MODULE_TO_MOCK = "celery.backends.azureblockblob"
+
+
+@skip.unless_module("azure")
+class test_AzureBlockBlobBackend:
+    def setup(self):
+        self.url = (
+            "azureblockblob://"
+            "DefaultEndpointsProtocol=protocol;"
+            "AccountName=name;"
+            "AccountKey=key;"
+            "EndpointSuffix=suffix")
+
+        self.backend = AzureBlockBlobBackend(
+            app=self.app,
+            url=self.url)
+
+    def test_missing_third_party_sdk(self):
+        azurestorage = azureblockblob.azurestorage
+        try:
+            azureblockblob.azurestorage = None
+            with pytest.raises(ImproperlyConfigured):
+                AzureBlockBlobBackend(app=self.app, url=self.url)
+        finally:
+            azureblockblob.azurestorage = azurestorage
+
+    def test_bad_connection_url(self):
+        with pytest.raises(ImproperlyConfigured):
+            AzureBlockBlobBackend._parse_url("azureblockblob://")
+
+        with pytest.raises(ImproperlyConfigured):
+            AzureBlockBlobBackend._parse_url("")
+
+    @patch(MODULE_TO_MOCK + ".BlockBlobService")
+    def test_create_client(self, mock_blob_service_factory):
+        mock_blob_service_instance = Mock()
+        mock_blob_service_factory.return_value = mock_blob_service_instance
+        backend = AzureBlockBlobBackend(app=self.app, url=self.url)
+
+        # ensure container gets created on client access...
+        assert mock_blob_service_instance.create_container.call_count == 0
+        assert backend._client is not None
+        assert mock_blob_service_instance.create_container.call_count == 1
+
+        # ...but only once per backend instance
+        assert backend._client is not None
+        assert mock_blob_service_instance.create_container.call_count == 1
+
+    @patch(MODULE_TO_MOCK + ".AzureBlockBlobBackend._client")
+    def test_get(self, mock_client):
+        self.backend.get(b"mykey")
+
+        mock_client.get_blob_to_text.assert_called_once_with(
+            "celery", "mykey")
+
+    @patch(MODULE_TO_MOCK + ".AzureBlockBlobBackend._client")
+    def test_get_missing(self, mock_client):
+        mock_client.get_blob_to_text.side_effect = \
+            azureblockblob.AzureMissingResourceHttpError("Missing", 404)
+
+        assert self.backend.get(b"mykey") is None
+
+    @patch(MODULE_TO_MOCK + ".AzureBlockBlobBackend._client")
+    def test_set(self, mock_client):
+        self.backend.set(b"mykey", "myvalue")
+
+        mock_client.create_blob_from_text.assert_called_once_with(
+            "celery", "mykey", "myvalue")
+
+    @patch(MODULE_TO_MOCK + ".AzureBlockBlobBackend._client")
+    def test_mget(self, mock_client):
+        keys = [b"mykey1", b"mykey2"]
+
+        self.backend.mget(keys)
+
+        mock_client.get_blob_to_text.assert_has_calls(
+            [call("celery", "mykey1"),
+             call("celery", "mykey2")])
+
+    @patch(MODULE_TO_MOCK + ".AzureBlockBlobBackend._client")
+    def test_delete(self, mock_client):
+        self.backend.delete(b"mykey")
+
+        mock_client.delete_blob.assert_called_once_with(
+            "celery", "mykey")

+ 6 - 2
tox.ini

@@ -1,7 +1,7 @@
 [tox]
 envlist =
     {2.7,pypy,3.4,3.5,3.6}-unit
-    {2.7,pypy,3.4,3.5,3.6}-integration-{rabbitmq,redis,dynamodb}
+    {2.7,pypy,3.4,3.5,3.6}-integration-{rabbitmq,redis,dynamodb,azureblockblob}
 
     flake8
     flakeplus
@@ -47,8 +47,12 @@ setenv =
     dynamodb: TEST_BACKEND=dynamodb://@localhost:8000
     dynamodb: AWS_ACCESS_KEY_ID=test_aws_key_id
     dynamodb: AWS_SECRET_ACCESS_KEY=test_aws_secret_key
-PASSENV =
+
+    azureblockblob: TEST_BROKER=redis://
+    azureblockblob: TEST_BACKEND=azureblockblob://DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
+passenv =
     TRAVIS
+    AZUREBLOCKBLOB_URL
 basepython =
     2.7: python2.7
     3.4: python3.4