diff --git a/plugins/repository-azure/build.gradle b/plugins/repository-azure/build.gradle index 49451984843..84850e318ec 100644 --- a/plugins/repository-azure/build.gradle +++ b/plugins/repository-azure/build.gradle @@ -27,6 +27,7 @@ dependencies { compile 'com.microsoft.azure:azure-keyvault-core:1.0.0' compile 'com.google.guava:guava:20.0' compile 'org.apache.commons:commons-lang3:3.4' + testCompile project(':test:fixtures:azure-fixture') } dependencyLicenses { diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle b/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle index 4720fe29edf..04867765ba5 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle +++ b/plugins/repository-azure/qa/microsoft-azure-storage/build.gradle @@ -26,6 +26,9 @@ import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE apply plugin: 'elasticsearch.standalone-rest-test' apply plugin: 'elasticsearch.rest-test' +apply plugin: 'elasticsearch.test.fixtures' + +testFixtures.useFixture ":test:fixtures:azure-fixture", "azure-fixture" boolean useFixture = false @@ -38,20 +41,12 @@ String azureSasToken = System.getenv("azure_storage_sas_token") if (!azureAccount && !azureKey && !azureContainer && !azureBasePath && !azureSasToken) { azureAccount = 'azure_integration_test_account' azureKey = 'YXp1cmVfaW50ZWdyYXRpb25fdGVzdF9rZXk=' // The key is "azure_integration_test_key" encoded using base64 - azureContainer = 'container_test' - azureBasePath = 'integration_test' + azureContainer = 'container' + azureBasePath = '' azureSasToken = '' useFixture = true } -/** A task to start the fixture which emulates an Azure Storage service **/ -task azureStorageFixture(type: AntFixture) { - dependsOn testClasses - env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }" - executable = new File(project.runtimeJavaHome, 'bin/java') - args 'org.elasticsearch.repositories.azure.AzureStorageFixture', baseDir, azureContainer -} - Map expansions = [ 'container': azureContainer, 'base_path': azureBasePath + "_integration_tests" @@ -77,11 +72,15 @@ testClusters.integTest { } if (useFixture) { - tasks.integTest.dependsOn azureStorageFixture + def azureAddress = { + int ephemeralPort = project(':test:fixtures:azure-fixture').postProcessFixture.ext."test.fixtures.azure-fixture.tcp.8091" + assert ephemeralPort > 0 + 'http://127.0.0.1:' + ephemeralPort + } // Use a closure on the string to delay evaluation until tests are executed. The endpoint_suffix is used // in a hacky way to change the protocol and endpoint. We must fix that. setting 'azure.client.integration_test.endpoint_suffix', - { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${azureStorageFixture.addressAndPort }" }, IGNORE_VALUE + { "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${ -> azureAddress() }" }, IGNORE_VALUE String firstPartOfSeed = project.rootProject.testSeed.tokenize(':').get(0) setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE } diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java deleted file mode 100644 index 0bd9503f43d..00000000000 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageFixture.java +++ /dev/null @@ -1,354 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.repositories.azure; - -import org.elasticsearch.test.fixture.AbstractHttpFixture; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.path.PathTrie; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -import static java.nio.charset.StandardCharsets.UTF_8; - -/** - * {@link AzureStorageFixture} emulates an Azure Storage service. - *

- * The implementation is based on official documentation available at - * https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api. - */ -public class AzureStorageFixture extends AbstractHttpFixture { - - /** - * List of the containers stored on this test server - **/ - private final Map containers = ConcurrentCollections.newConcurrentMap(); - - /** - * Request handlers for the requests made by the Azure client - **/ - private final PathTrie handlers; - - /** - * Creates a {@link AzureStorageFixture} with a custom endpoint - */ - private AzureStorageFixture(final String workingDir, final String container) { - super(workingDir); - this.containers.put(container, new Container(container)); - this.handlers = defaultHandlers(containers); - } - - @Override - protected AbstractHttpFixture.Response handle(final Request request) throws IOException { - final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters()); - if (handler != null) { - final String authorization = request.getHeader("Authorization"); - if (authorization == null - || (authorization.length() > 0 && authorization.contains("azure_integration_test_account") == false)) { - return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Access Denied"); - } - return handler.handle(request); - } - return null; - } - - public static void main(final String[] args) throws Exception { - if (args == null || args.length != 2) { - throw new IllegalArgumentException("AzureStorageFixture "); - } - - final AzureStorageFixture fixture = new AzureStorageFixture(args[0], args[1]); - fixture.listen(); - } - - /** - * Builds the default request handlers - **/ - private static PathTrie defaultHandlers(final Map containers) { - final PathTrie handlers = new PathTrie<>(RestUtils.REST_DECODER); - - // Get Blob Properties - // - // https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties - objectsPaths("HEAD /{container}").forEach(path -> - handlers.insert(path, (request) -> { - final String containerName = request.getParam("container"); - - final Container container = containers.get(containerName); - if (container == null) { - return newContainerNotFoundError(request.getId()); - } - - final String blobName = objectName(request.getParameters()); - for (Map.Entry object : container.objects.entrySet()) { - if (object.getKey().equals(blobName)) { - Map responseHeaders = new HashMap<>(); - responseHeaders.put("x-ms-blob-content-length", String.valueOf(object.getValue().length)); - responseHeaders.put("x-ms-blob-type", "blockblob"); - return new Response(RestStatus.OK.getStatus(), responseHeaders, EMPTY_BYTE); - } - } - return newBlobNotFoundError(request.getId()); - }) - ); - - // PUT Blob - // - // https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob - objectsPaths("PUT /{container}").forEach(path -> - handlers.insert(path, (request) -> { - final String destContainerName = request.getParam("container"); - final String destBlobName = objectName(request.getParameters()); - final String ifNoneMatch = request.getHeader("If-None-Match"); - - final Container destContainer = containers.get(destContainerName); - if (destContainer == null) { - return newContainerNotFoundError(request.getId()); - } - - if ("*".equals(ifNoneMatch)) { - byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody()); - if (existingBytes != null) { - return newBlobAlreadyExistsError(request.getId()); - } - } else { - destContainer.objects.put(destBlobName, request.getBody()); - } - - return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); }) - ); - - // GET Object - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html - objectsPaths("GET /{container}").forEach(path -> - handlers.insert(path, (request) -> { - final String containerName = request.getParam("container"); - - final Container container = containers.get(containerName); - if (container == null) { - return newContainerNotFoundError(request.getId()); - } - - final String blobName = objectName(request.getParameters()); - if (container.objects.containsKey(blobName)) { - Map responseHeaders = new HashMap<>(contentType("application/octet-stream")); - responseHeaders.put("x-ms-copy-status", "success"); - responseHeaders.put("x-ms-blob-type", "blockblob"); - return new Response(RestStatus.OK.getStatus(), responseHeaders, container.objects.get(blobName)); - - } - return newBlobNotFoundError(request.getId()); - }) - ); - - // Delete Blob - // - // https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob - objectsPaths("DELETE /{container}").forEach(path -> - handlers.insert(path, (request) -> { - final String containerName = request.getParam("container"); - - final Container container = containers.get(containerName); - if (container == null) { - return newContainerNotFoundError(request.getId()); - } - - final String blobName = objectName(request.getParameters()); - if (container.objects.remove(blobName) != null) { - return new Response(RestStatus.ACCEPTED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - } - return newBlobNotFoundError(request.getId()); - }) - ); - - // List Blobs - // - // https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs - handlers.insert("GET /{container}/", (request) -> { - final String containerName = request.getParam("container"); - - final Container container = containers.get(containerName); - if (container == null) { - return newContainerNotFoundError(request.getId()); - } - - final String prefix = request.getParam("prefix"); - return newEnumerationResultsResponse(request.getId(), container, prefix); - }); - - // Get Container Properties - // - // https://docs.microsoft.com/en-us/rest/api/storageservices/get-container-properties - handlers.insert("HEAD /{container}", (request) -> { - String container = request.getParam("container"); - if (Strings.hasText(container) && containers.containsKey(container)) { - return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - } else { - return newContainerNotFoundError(request.getId()); - } - }); - - return handlers; - } - - /** - * Represents a Azure Storage container. - */ - static class Container { - - /** - * Container name - **/ - final String name; - - /** - * Blobs contained in the container - **/ - final Map objects; - - Container(final String name) { - this.name = Objects.requireNonNull(name); - this.objects = ConcurrentCollections.newConcurrentMap(); - } - } - - /** - * Decline a path like "http://host:port/{bucket}" into 10 derived paths like: - * - http://host:port/{bucket}/{path0} - * - http://host:port/{bucket}/{path0}/{path1} - * - http://host:port/{bucket}/{path0}/{path1}/{path2} - * - etc - */ - private static List objectsPaths(final String path) { - final List paths = new ArrayList<>(); - String p = path; - for (int i = 0; i < 10; i++) { - p = p + "/{path" + i + "}"; - paths.add(p); - } - return paths; - } - - /** - * Retrieves the object name from all derived paths named {pathX} where 0 <= X < 10. - *

- * This is the counterpart of {@link #objectsPaths(String)} - */ - private static String objectName(final Map params) { - final StringBuilder name = new StringBuilder(); - for (int i = 0; i < 10; i++) { - String value = params.getOrDefault("path" + i, null); - if (value != null) { - if (name.length() > 0) { - name.append('/'); - } - name.append(value); - } - } - return name.toString(); - } - - - /** - * Azure EnumerationResults Response - */ - private static Response newEnumerationResultsResponse(final long requestId, final Container container, final String prefix) { - final String id = Long.toString(requestId); - final StringBuilder response = new StringBuilder(); - response.append(""); - response.append(""); - if (prefix != null) { - response.append("").append(prefix).append(""); - } else { - response.append(""); - } - response.append("").append(container.objects.size()).append(""); - response.append(""); - - int count = 0; - for (Map.Entry object : container.objects.entrySet()) { - String objectName = object.getKey(); - if (prefix == null || objectName.startsWith(prefix)) { - response.append(""); - response.append("").append(objectName).append(""); - response.append(""); - response.append("").append(object.getValue().length).append(""); - response.append("").append(count++).append(""); - response.append("success"); - response.append("BlockBlob"); - response.append(""); - response.append(""); - } - } - - response.append(""); - response.append(""); - response.append(""); - - final Map headers = new HashMap<>(contentType("application/xml")); - headers.put("x-ms-request-id", id); - - return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8)); - } - - private static Response newContainerNotFoundError(final long requestId) { - return newError(requestId, RestStatus.NOT_FOUND, "ContainerNotFound", "The specified container does not exist"); - } - - private static Response newBlobNotFoundError(final long requestId) { - return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist"); - } - - private static Response newBlobAlreadyExistsError(final long requestId) { - return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists"); - } - - /** - * Azure Error - *

- * https://docs.microsoft.com/en-us/rest/api/storageservices/status-and-error-codes2 - */ - private static Response newError(final long requestId, - final RestStatus status, - final String code, - final String message) { - - final StringBuilder response = new StringBuilder(); - response.append(""); - response.append(""); - response.append("").append(code).append(""); - response.append("").append(message).append(""); - response.append(""); - - final Map headers = new HashMap<>(contentType("application/xml")); - headers.put("x-ms-request-id", String.valueOf(requestId)); - headers.put("x-ms-error-code", code); - - return new Response(status.getStatus(), headers, response.toString().getBytes(UTF_8)); - } -} diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index daf4e9ad57b..463437597a7 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -24,6 +24,7 @@ import com.microsoft.azure.storage.RetryPolicyFactory; import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpServer; +import fixture.azure.AzureHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; @@ -184,7 +185,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { } } if (randomBoolean()) { - TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); } exchange.close(); }); @@ -209,7 +210,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) { exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); } else { - TestUtils.sendError(exchange, RestStatus.BAD_REQUEST); + AzureHttpHandler.sendError(exchange, RestStatus.BAD_REQUEST); } exchange.close(); return; @@ -220,7 +221,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]); } else { Streams.readFully(exchange.getRequestBody()); - TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); } } exchange.close(); @@ -282,7 +283,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { if (randomBoolean()) { Streams.readFully(exchange.getRequestBody()); - TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); } exchange.close(); }); diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 4bd71d8216b..5bc0d2684f1 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -24,34 +24,21 @@ import com.microsoft.azure.storage.RetryPolicyFactory; import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import fixture.azure.AzureHttpHandler; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @@ -77,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/container", new InternalHttpHandler()); + return Collections.singletonMap("/container", new AzureHttpHandler("container")); } @Override @@ -128,114 +115,6 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg } } - /** - * Minimal HTTP handler that acts as an Azure compliant server - */ - @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") - private static class InternalHttpHandler implements HttpHandler { - - private final Map blobs = new ConcurrentHashMap<>(); - - @Override - public void handle(final HttpExchange exchange) throws IOException { - final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); - try { - if (Regex.simpleMatch("PUT /container/*blockid=*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - - final String blockId = params.get("blockid"); - blobs.put(blockId, Streams.readFully(exchange.getRequestBody())); - exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); - - } else if (Regex.simpleMatch("PUT /container/*comp=blocklist*", request)) { - final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); - final List blockIds = Arrays.stream(blockList.split("")) - .filter(line -> line.contains("")) - .map(line -> line.substring(0, line.indexOf(""))) - .collect(Collectors.toList()); - - final ByteArrayOutputStream blob = new ByteArrayOutputStream(); - for (String blockId : blockIds) { - BytesReference block = blobs.remove(blockId); - assert block != null; - block.writeTo(blob); - } - blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray())); - exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); - - } else if (Regex.simpleMatch("PUT /container/*", request)) { - blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())); - exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); - - } else if (Regex.simpleMatch("HEAD /container/*", request)) { - final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); - if (blob == null) { - TestUtils.sendError(exchange, RestStatus.NOT_FOUND); - return; - } - exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length())); - exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); - - } else if (Regex.simpleMatch("GET /container/*", request)) { - final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); - if (blob == null) { - TestUtils.sendError(exchange, RestStatus.NOT_FOUND); - return; - } - - final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER); - final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range); - assertTrue(matcher.matches()); - - final int start = Integer.parseInt(matcher.group(1)); - final int length = Integer.parseInt(matcher.group(2)) - start + 1; - - exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); - exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); - exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length); - - } else if (Regex.simpleMatch("DELETE /container/*", request)) { - drainInputStream(exchange.getRequestBody()); - blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath())); - exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1); - - } else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - - final StringBuilder list = new StringBuilder(); - list.append(""); - list.append(""); - final String prefix = params.get("prefix"); - list.append(""); - for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/container/" + prefix)) { - list.append("").append(blob.getKey().replace("/container/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append("BlockBlob"); - } - } - list.append(""); - list.append(""); - - byte[] response = list.toString().getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - } else { - TestUtils.sendError(exchange, RestStatus.BAD_REQUEST); - } - } finally { - exchange.close(); - } - } - } - /** * HTTP handler that injects random Azure service errors * @@ -252,7 +131,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg @Override protected void handleAsError(final HttpExchange exchange) throws IOException { drainInputStream(exchange.getRequestBody()); - TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); + AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE)); exchange.close(); } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/TestUtils.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/TestUtils.java deleted file mode 100644 index 816976f3acf..00000000000 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/TestUtils.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.repositories.azure; - -import com.microsoft.azure.storage.Constants; -import com.microsoft.azure.storage.StorageErrorCodeStrings; -import com.sun.net.httpserver.Headers; -import com.sun.net.httpserver.HttpExchange; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.rest.RestStatus; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -final class TestUtils { - - private TestUtils() {} - - @SuppressForbidden(reason = "use HttpExchange and Headers") - static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException { - final Headers headers = exchange.getResponseHeaders(); - headers.add("Content-Type", "application/xml"); - - final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); - if (requestId != null) { - headers.add(Constants.HeaderConstants.REQUEST_ID_HEADER, requestId); - } - - final String errorCode = toAzureErrorCode(status); - if (errorCode != null) { - headers.add(Constants.HeaderConstants.ERROR_CODE, errorCode); - } - - if (errorCode == null || "HEAD".equals(exchange.getRequestMethod())) { - exchange.sendResponseHeaders(status.getStatus(), -1L); - } else { - final byte[] response = ("" + errorCode + "" - + status + "").getBytes(StandardCharsets.UTF_8); - exchange.sendResponseHeaders(status.getStatus(), response.length); - exchange.getResponseBody().write(response); - } - } - - // See https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes - private static String toAzureErrorCode(final RestStatus status) { - assert status.getStatus() >= 400; - switch (status) { - case BAD_REQUEST: - return StorageErrorCodeStrings.INVALID_METADATA; - case NOT_FOUND: - return StorageErrorCodeStrings.BLOB_NOT_FOUND; - case INTERNAL_SERVER_ERROR: - return StorageErrorCodeStrings.INTERNAL_ERROR; - case SERVICE_UNAVAILABLE: - return StorageErrorCodeStrings.SERVER_BUSY; - default: - throw new IllegalArgumentException("Error code [" + status.getStatus() + "] is not mapped to an existing Azure code"); - } - } -} diff --git a/settings.gradle b/settings.gradle index 42746c14f0b..8d355ab676f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -51,6 +51,7 @@ List projects = [ 'test:fixtures:hdfs-fixture', 'test:fixtures:krb5kdc-fixture', 'test:fixtures:old-elasticsearch', + 'test:fixtures:azure-fixture', 'test:logger-usage' ] diff --git a/test/fixtures/azure-fixture/Dockerfile b/test/fixtures/azure-fixture/Dockerfile new file mode 100644 index 00000000000..763a81761cd --- /dev/null +++ b/test/fixtures/azure-fixture/Dockerfile @@ -0,0 +1,5 @@ +FROM ubuntu:19.04 +RUN apt-get update -qqy +RUN apt-get install -qqy openjdk-12-jre-headless +ENTRYPOINT exec java -classpath "/fixture/shared/*" fixture.azure.AzureHttpFixture 0.0.0.0 8091 container +EXPOSE 8091 diff --git a/test/fixtures/azure-fixture/build.gradle b/test/fixtures/azure-fixture/build.gradle new file mode 100644 index 00000000000..cc65d3c05e2 --- /dev/null +++ b/test/fixtures/azure-fixture/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +apply plugin: 'elasticsearch.build' +apply plugin: 'elasticsearch.test.fixtures' + +description = 'Fixture for Azure external service' +test.enabled = false + +dependencies { + compile project(':server') +} + +preProcessFixture { + dependsOn jar + doLast { + file("${testFixturesDir}/shared").mkdirs() + project.copy { + from jar + from configurations.runtimeClasspath + into "${testFixturesDir}/shared" + } + } +} \ No newline at end of file diff --git a/test/fixtures/azure-fixture/docker-compose.yml b/test/fixtures/azure-fixture/docker-compose.yml new file mode 100644 index 00000000000..ff328c52f34 --- /dev/null +++ b/test/fixtures/azure-fixture/docker-compose.yml @@ -0,0 +1,10 @@ +version: '3' +services: + azure-fixture: + build: + context: . + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "8091" diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java new file mode 100644 index 00000000000..1def1439429 --- /dev/null +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpFixture.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.azure; + +import com.sun.net.httpserver.HttpServer; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +public class AzureHttpFixture { + + private final HttpServer server; + + private AzureHttpFixture(final String address, final int port, final String container) throws IOException { + this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0); + server.createContext("/" + container, new AzureHttpHandler(container)); + } + + private void start() throws Exception { + try { + server.start(); + // wait to be killed + Thread.sleep(Long.MAX_VALUE); + } finally { + server.stop(0); + } + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length != 3) { + throw new IllegalArgumentException("AzureHttpFixture expects 3 arguments [address, port, container]"); + } + final AzureHttpFixture fixture = new AzureHttpFixture(args[0], Integer.parseInt(args[1]), args[2]); + fixture.start(); + } +} diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java new file mode 100644 index 00000000000..affd1181221 --- /dev/null +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java @@ -0,0 +1,225 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.azure; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.RestUtils; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Minimal HTTP handler that acts as an Azure compliant server + */ +@SuppressForbidden(reason = "Uses a HttpServer to emulate an Azure endpoint") +public class AzureHttpHandler implements HttpHandler { + + private final Map blobs; + private final String container; + + public AzureHttpHandler(final String container) { + this.container = Objects.requireNonNull(container); + this.blobs = new ConcurrentHashMap<>(); + } + + @Override + public void handle(final HttpExchange exchange) throws IOException { + final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); + try { + if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) { + // Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block) + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + + final String blockId = params.get("blockid"); + blobs.put(blockId, Streams.readFully(exchange.getRequestBody())); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + + } else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) { + // Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list) + final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); + final List blockIds = Arrays.stream(blockList.split("")) + .filter(line -> line.contains("")) + .map(line -> line.substring(0, line.indexOf(""))) + .collect(Collectors.toList()); + + final ByteArrayOutputStream blob = new ByteArrayOutputStream(); + for (String blockId : blockIds) { + BytesReference block = blobs.remove(blockId); + assert block != null; + block.writeTo(blob); + } + blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray())); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + + } else if (Regex.simpleMatch("PUT /" + container + "/*", request)) { + // PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob) + final String ifNoneMatch = exchange.getResponseHeaders().getFirst("If-None-Match"); + if ("*".equals(ifNoneMatch)) { + if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) { + sendError(exchange, RestStatus.CONFLICT); + return; + } + } else { + blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())); + } + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + + } else if (Regex.simpleMatch("HEAD /" + container + "/*", request)) { + // Get Blob Properties (see https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties) + final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); + if (blob == null) { + sendError(exchange, RestStatus.NOT_FOUND); + return; + } + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length())); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + + } else if (Regex.simpleMatch("GET /" + container + "/*", request)) { + // GET Object (https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html) + final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); + if (blob == null) { + sendError(exchange, RestStatus.NOT_FOUND); + return; + } + + // see Constants.HeaderConstants.STORAGE_RANGE_HEADER + final String range = exchange.getRequestHeaders().getFirst("x-ms-range"); + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range); + if (matcher.matches() == false) { + throw new AssertionError("Range header does not match expected format: " + range); + } + + final int start = Integer.parseInt(matcher.group(1)); + final int length = Integer.parseInt(matcher.group(2)) - start + 1; + + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); + exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); + exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length); + + } else if (Regex.simpleMatch("DELETE /" + container + "/*", request)) { + // Delete Blob (https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob) + try (InputStream is = exchange.getRequestBody()) { + while (is.read() >= 0); + } + blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath())); + exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1); + + } else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) { + // List Blobs (https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs) + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + + final StringBuilder list = new StringBuilder(); + list.append(""); + list.append(""); + final String prefix = params.get("prefix"); + list.append(""); + for (Map.Entry blob : blobs.entrySet()) { + if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) { + list.append("").append(blob.getKey().replace("/" + container + "/", "")).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append("BlockBlob"); + } + } + list.append(""); + list.append(""); + + byte[] response = list.toString().getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else { + sendError(exchange, RestStatus.BAD_REQUEST); + } + } finally { + exchange.close(); + } + } + + public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException { + final Headers headers = exchange.getResponseHeaders(); + headers.add("Content-Type", "application/xml"); + + // see Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER + final String requestId = exchange.getRequestHeaders().getFirst("x-ms-client-request-id"); + if (requestId != null) { + // see Constants.HeaderConstants.STORAGE_RANGE_HEADER + headers.add("x-ms-request-id", requestId); + } + + final String errorCode = toAzureErrorCode(status); + if (errorCode != null) { + // see Constants.HeaderConstants.ERROR_CODE + headers.add("x-ms-error-code", errorCode); + } + + if (errorCode == null || "HEAD".equals(exchange.getRequestMethod())) { + exchange.sendResponseHeaders(status.getStatus(), -1L); + } else { + final byte[] response = ("" + errorCode + "" + + status + "").getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(status.getStatus(), response.length); + exchange.getResponseBody().write(response); + } + } + + // See https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes + private static String toAzureErrorCode(final RestStatus status) { + assert status.getStatus() >= 400; + switch (status) { + case BAD_REQUEST: + return "InvalidMetadata"; + case NOT_FOUND: + return "BlobNotFound"; + case INTERNAL_SERVER_ERROR: + return "InternalError"; + case SERVICE_UNAVAILABLE: + return "ServerBusy"; + case CONFLICT: + return "BlobAlreadyExists"; + default: + throw new IllegalArgumentException("Error code [" + status.getStatus() + "] is not mapped to an existing Azure code"); + } + } +}