Add docker-compose based test fixture for Azure (#48736)

This commit adds a new :test:fixtures:azure-fixture project which 
provides a docker-compose based container that runs a AzureHttpFixture 
Java class that emulates an Azure Storage service.

The logic to emulate the service is extracted from existing tests and 
placed in AzureHttpHandler into the new project so that it can be 
easily reused. The :plugins:repository-azure project is an example 
of such utilization.

The AzureHttpFixture fixture is just a wrapper around AzureHttpHandler 
and is now executed within the docker container. 

The :plugins:repository-azure:qa:microsoft-azure project uses the new 
test fixture and the existing AzureStorageFixture has been removed.
This commit is contained in:
Tanguy Leroux 2019-10-31 10:43:43 +01:00 committed by GitHub
parent c3063c4e1f
commit 989467ca1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 353 additions and 570 deletions

View File

@ -27,6 +27,7 @@ dependencies {
compile 'com.microsoft.azure:azure-keyvault-core:1.0.0'
compile 'com.google.guava:guava:20.0'
compile 'org.apache.commons:commons-lang3:3.4'
testCompile project(':test:fixtures:azure-fixture')
}
dependencyLicenses {

View File

@ -26,6 +26,9 @@ import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture ":test:fixtures:azure-fixture", "azure-fixture"
boolean useFixture = false
@ -38,20 +41,12 @@ String azureSasToken = System.getenv("azure_storage_sas_token")
if (!azureAccount && !azureKey && !azureContainer && !azureBasePath && !azureSasToken) {
azureAccount = 'azure_integration_test_account'
azureKey = 'YXp1cmVfaW50ZWdyYXRpb25fdGVzdF9rZXk=' // The key is "azure_integration_test_key" encoded using base64
azureContainer = 'container_test'
azureBasePath = 'integration_test'
azureContainer = 'container'
azureBasePath = ''
azureSasToken = ''
useFixture = true
}
/** A task to start the fixture which emulates an Azure Storage service **/
task azureStorageFixture(type: AntFixture) {
dependsOn testClasses
env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }"
executable = new File(project.runtimeJavaHome, 'bin/java')
args 'org.elasticsearch.repositories.azure.AzureStorageFixture', baseDir, azureContainer
}
Map<String, Object> expansions = [
'container': azureContainer,
'base_path': azureBasePath + "_integration_tests"
@ -77,11 +72,15 @@ testClusters.integTest {
}
if (useFixture) {
tasks.integTest.dependsOn azureStorageFixture
def azureAddress = {
int ephemeralPort = project(':test:fixtures:azure-fixture').postProcessFixture.ext."test.fixtures.azure-fixture.tcp.8091"
assert ephemeralPort > 0
'http://127.0.0.1:' + ephemeralPort
}
// Use a closure on the string to delay evaluation until tests are executed. The endpoint_suffix is used
// in a hacky way to change the protocol and endpoint. We must fix that.
setting 'azure.client.integration_test.endpoint_suffix',
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${azureStorageFixture.addressAndPort }" }, IGNORE_VALUE
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${ -> azureAddress() }" }, IGNORE_VALUE
String firstPartOfSeed = project.rootProject.testSeed.tokenize(':').get(0)
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE
}

View File

@ -1,354 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* {@link AzureStorageFixture} emulates an Azure Storage service.
* <p>
* The implementation is based on official documentation available at
* https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api.
*/
public class AzureStorageFixture extends AbstractHttpFixture {
/**
* List of the containers stored on this test server
**/
private final Map<String, Container> containers = ConcurrentCollections.newConcurrentMap();
/**
* Request handlers for the requests made by the Azure client
**/
private final PathTrie<RequestHandler> handlers;
/**
* Creates a {@link AzureStorageFixture} with a custom endpoint
*/
private AzureStorageFixture(final String workingDir, final String container) {
super(workingDir);
this.containers.put(container, new Container(container));
this.handlers = defaultHandlers(containers);
}
@Override
protected AbstractHttpFixture.Response handle(final Request request) throws IOException {
final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters());
if (handler != null) {
final String authorization = request.getHeader("Authorization");
if (authorization == null
|| (authorization.length() > 0 && authorization.contains("azure_integration_test_account") == false)) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Access Denied");
}
return handler.handle(request);
}
return null;
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("AzureStorageFixture <working directory> <container>");
}
final AzureStorageFixture fixture = new AzureStorageFixture(args[0], args[1]);
fixture.listen();
}
/**
* Builds the default request handlers
**/
private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Container> containers) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
// Get Blob Properties
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
objectsPaths("HEAD /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String containerName = request.getParam("container");
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
final String blobName = objectName(request.getParameters());
for (Map.Entry<String, byte[]> object : container.objects.entrySet()) {
if (object.getKey().equals(blobName)) {
Map<String, String> responseHeaders = new HashMap<>();
responseHeaders.put("x-ms-blob-content-length", String.valueOf(object.getValue().length));
responseHeaders.put("x-ms-blob-type", "blockblob");
return new Response(RestStatus.OK.getStatus(), responseHeaders, EMPTY_BYTE);
}
}
return newBlobNotFoundError(request.getId());
})
);
// PUT Blob
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob
objectsPaths("PUT /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String destContainerName = request.getParam("container");
final String destBlobName = objectName(request.getParameters());
final String ifNoneMatch = request.getHeader("If-None-Match");
final Container destContainer = containers.get(destContainerName);
if (destContainer == null) {
return newContainerNotFoundError(request.getId());
}
if ("*".equals(ifNoneMatch)) {
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
if (existingBytes != null) {
return newBlobAlreadyExistsError(request.getId());
}
} else {
destContainer.objects.put(destBlobName, request.getBody());
}
return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); })
);
// GET Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
objectsPaths("GET /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String containerName = request.getParam("container");
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
final String blobName = objectName(request.getParameters());
if (container.objects.containsKey(blobName)) {
Map<String, String> responseHeaders = new HashMap<>(contentType("application/octet-stream"));
responseHeaders.put("x-ms-copy-status", "success");
responseHeaders.put("x-ms-blob-type", "blockblob");
return new Response(RestStatus.OK.getStatus(), responseHeaders, container.objects.get(blobName));
}
return newBlobNotFoundError(request.getId());
})
);
// Delete Blob
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob
objectsPaths("DELETE /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String containerName = request.getParam("container");
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
final String blobName = objectName(request.getParameters());
if (container.objects.remove(blobName) != null) {
return new Response(RestStatus.ACCEPTED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
return newBlobNotFoundError(request.getId());
})
);
// List Blobs
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs
handlers.insert("GET /{container}/", (request) -> {
final String containerName = request.getParam("container");
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
final String prefix = request.getParam("prefix");
return newEnumerationResultsResponse(request.getId(), container, prefix);
});
// Get Container Properties
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-container-properties
handlers.insert("HEAD /{container}", (request) -> {
String container = request.getParam("container");
if (Strings.hasText(container) && containers.containsKey(container)) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
} else {
return newContainerNotFoundError(request.getId());
}
});
return handlers;
}
/**
* Represents a Azure Storage container.
*/
static class Container {
/**
* Container name
**/
final String name;
/**
* Blobs contained in the container
**/
final Map<String, byte[]> objects;
Container(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
/**
* Decline a path like "http://host:port/{bucket}" into 10 derived paths like:
* - http://host:port/{bucket}/{path0}
* - http://host:port/{bucket}/{path0}/{path1}
* - http://host:port/{bucket}/{path0}/{path1}/{path2}
* - etc
*/
private static List<String> objectsPaths(final String path) {
final List<String> paths = new ArrayList<>();
String p = path;
for (int i = 0; i < 10; i++) {
p = p + "/{path" + i + "}";
paths.add(p);
}
return paths;
}
/**
* Retrieves the object name from all derived paths named {pathX} where 0 <= X < 10.
* <p>
* This is the counterpart of {@link #objectsPaths(String)}
*/
private static String objectName(final Map<String, String> params) {
final StringBuilder name = new StringBuilder();
for (int i = 0; i < 10; i++) {
String value = params.getOrDefault("path" + i, null);
if (value != null) {
if (name.length() > 0) {
name.append('/');
}
name.append(value);
}
}
return name.toString();
}
/**
* Azure EnumerationResults Response
*/
private static Response newEnumerationResultsResponse(final long requestId, final Container container, final String prefix) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"utf-8\"?>");
response.append("<EnumerationResults ServiceEndpoint=\"http://myaccount.blob.core.windows.net/\"");
response.append(" ContainerName=\"").append(container.name).append("\">");
if (prefix != null) {
response.append("<Prefix>").append(prefix).append("</Prefix>");
} else {
response.append("<Prefix/>");
}
response.append("<MaxResults>").append(container.objects.size()).append("</MaxResults>");
response.append("<Blobs>");
int count = 0;
for (Map.Entry<String, byte[]> object : container.objects.entrySet()) {
String objectName = object.getKey();
if (prefix == null || objectName.startsWith(prefix)) {
response.append("<Blob>");
response.append("<Name>").append(objectName).append("</Name>");
response.append("<Properties>");
response.append("<Content-Length>").append(object.getValue().length).append("</Content-Length>");
response.append("<CopyId>").append(count++).append("</CopyId>");
response.append("<CopyStatus>success</CopyStatus>");
response.append("<BlobType>BlockBlob</BlobType>");
response.append("</Properties>");
response.append("</Blob>");
}
}
response.append("</Blobs>");
response.append("<NextMarker />");
response.append("</EnumerationResults>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-ms-request-id", id);
return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8));
}
private static Response newContainerNotFoundError(final long requestId) {
return newError(requestId, RestStatus.NOT_FOUND, "ContainerNotFound", "The specified container does not exist");
}
private static Response newBlobNotFoundError(final long requestId) {
return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist");
}
private static Response newBlobAlreadyExistsError(final long requestId) {
return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists");
}
/**
* Azure Error
* <p>
* https://docs.microsoft.com/en-us/rest/api/storageservices/status-and-error-codes2
*/
private static Response newError(final long requestId,
final RestStatus status,
final String code,
final String message) {
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<Error>");
response.append("<Code>").append(code).append("</Code>");
response.append("<Message>").append(message).append("</Message>");
response.append("</Error>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-ms-request-id", String.valueOf(requestId));
headers.put("x-ms-error-code", code);
return new Response(status.getStatus(), headers, response.toString().getBytes(UTF_8));
}
}

View File

@ -24,6 +24,7 @@ import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import fixture.azure.AzureHttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
@ -184,7 +185,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
}
}
if (randomBoolean()) {
TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
}
exchange.close();
});
@ -209,7 +210,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) {
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else {
TestUtils.sendError(exchange, RestStatus.BAD_REQUEST);
AzureHttpHandler.sendError(exchange, RestStatus.BAD_REQUEST);
}
exchange.close();
return;
@ -220,7 +221,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.max(1, bytes.length - 1))]);
} else {
Streams.readFully(exchange.getRequestBody());
TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
}
}
exchange.close();
@ -282,7 +283,7 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody());
TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
}
exchange.close();
});

View File

@ -24,34 +24,21 @@ import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.azure.AzureHttpHandler;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@ -77,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/container", new InternalHttpHandler());
return Collections.singletonMap("/container", new AzureHttpHandler("container"));
}
@Override
@ -128,114 +115,6 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
}
}
/**
* Minimal HTTP handler that acts as an Azure compliant server
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
private static class InternalHttpHandler implements HttpHandler {
private final Map<String, BytesReference> blobs = new ConcurrentHashMap<>();
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
try {
if (Regex.simpleMatch("PUT /container/*blockid=*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String blockId = params.get("blockid");
blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /container/*comp=blocklist*", request)) {
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
.filter(line -> line.contains("</Latest>"))
.map(line -> line.substring(0, line.indexOf("</Latest>")))
.collect(Collectors.toList());
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
for (String blockId : blockIds) {
BytesReference block = blobs.remove(blockId);
assert block != null;
block.writeTo(blob);
}
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /container/*", request)) {
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("HEAD /container/*", request)) {
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
if (blob == null) {
TestUtils.sendError(exchange, RestStatus.NOT_FOUND);
return;
}
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length()));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else if (Regex.simpleMatch("GET /container/*", request)) {
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
if (blob == null) {
TestUtils.sendError(exchange, RestStatus.NOT_FOUND);
return;
}
final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range);
assertTrue(matcher.matches());
final int start = Integer.parseInt(matcher.group(1));
final int length = Integer.parseInt(matcher.group(2)) - start + 1;
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length);
} else if (Regex.simpleMatch("DELETE /container/*", request)) {
drainInputStream(exchange.getRequestBody());
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
} else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final StringBuilder list = new StringBuilder();
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
list.append("<EnumerationResults>");
final String prefix = params.get("prefix");
list.append("<Blobs>");
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/container/" + prefix)) {
list.append("<Blob><Name>").append(blob.getKey().replace("/container/", "")).append("</Name>");
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
}
}
list.append("</Blobs>");
list.append("</EnumerationResults>");
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
TestUtils.sendError(exchange, RestStatus.BAD_REQUEST);
}
} finally {
exchange.close();
}
}
}
/**
* HTTP handler that injects random Azure service errors
*
@ -252,7 +131,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
@Override
protected void handleAsError(final HttpExchange exchange) throws IOException {
drainInputStream(exchange.getRequestBody());
TestUtils.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
AzureHttpHandler.sendError(exchange, randomFrom(RestStatus.INTERNAL_SERVER_ERROR, RestStatus.SERVICE_UNAVAILABLE));
exchange.close();
}

View File

@ -1,76 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
final class TestUtils {
private TestUtils() {}
@SuppressForbidden(reason = "use HttpExchange and Headers")
static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
final Headers headers = exchange.getResponseHeaders();
headers.add("Content-Type", "application/xml");
final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
if (requestId != null) {
headers.add(Constants.HeaderConstants.REQUEST_ID_HEADER, requestId);
}
final String errorCode = toAzureErrorCode(status);
if (errorCode != null) {
headers.add(Constants.HeaderConstants.ERROR_CODE, errorCode);
}
if (errorCode == null || "HEAD".equals(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(status.getStatus(), -1L);
} else {
final byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>" + errorCode + "</Code><Message>"
+ status + "</Message></Error>").getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(status.getStatus(), response.length);
exchange.getResponseBody().write(response);
}
}
// See https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes
private static String toAzureErrorCode(final RestStatus status) {
assert status.getStatus() >= 400;
switch (status) {
case BAD_REQUEST:
return StorageErrorCodeStrings.INVALID_METADATA;
case NOT_FOUND:
return StorageErrorCodeStrings.BLOB_NOT_FOUND;
case INTERNAL_SERVER_ERROR:
return StorageErrorCodeStrings.INTERNAL_ERROR;
case SERVICE_UNAVAILABLE:
return StorageErrorCodeStrings.SERVER_BUSY;
default:
throw new IllegalArgumentException("Error code [" + status.getStatus() + "] is not mapped to an existing Azure code");
}
}
}

View File

@ -51,6 +51,7 @@ List projects = [
'test:fixtures:hdfs-fixture',
'test:fixtures:krb5kdc-fixture',
'test:fixtures:old-elasticsearch',
'test:fixtures:azure-fixture',
'test:logger-usage'
]

View File

@ -0,0 +1,5 @@
FROM ubuntu:19.04
RUN apt-get update -qqy
RUN apt-get install -qqy openjdk-12-jre-headless
ENTRYPOINT exec java -classpath "/fixture/shared/*" fixture.azure.AzureHttpFixture 0.0.0.0 8091 container
EXPOSE 8091

View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'elasticsearch.build'
apply plugin: 'elasticsearch.test.fixtures'
description = 'Fixture for Azure external service'
test.enabled = false
dependencies {
compile project(':server')
}
preProcessFixture {
dependsOn jar
doLast {
file("${testFixturesDir}/shared").mkdirs()
project.copy {
from jar
from configurations.runtimeClasspath
into "${testFixturesDir}/shared"
}
}
}

View File

@ -0,0 +1,10 @@
version: '3'
services:
azure-fixture:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "8091"

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.azure;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
public class AzureHttpFixture {
private final HttpServer server;
private AzureHttpFixture(final String address, final int port, final String container) throws IOException {
this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
server.createContext("/" + container, new AzureHttpHandler(container));
}
private void start() throws Exception {
try {
server.start();
// wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
server.stop(0);
}
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 3) {
throw new IllegalArgumentException("AzureHttpFixture expects 3 arguments [address, port, container]");
}
final AzureHttpFixture fixture = new AzureHttpFixture(args[0], Integer.parseInt(args[1]), args[2]);
fixture.start();
}
}

View File

@ -0,0 +1,225 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.azure;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Minimal HTTP handler that acts as an Azure compliant server
*/
@SuppressForbidden(reason = "Uses a HttpServer to emulate an Azure endpoint")
public class AzureHttpHandler implements HttpHandler {
private final Map<String, BytesReference> blobs;
private final String container;
public AzureHttpHandler(final String container) {
this.container = Objects.requireNonNull(container);
this.blobs = new ConcurrentHashMap<>();
}
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
try {
if (Regex.simpleMatch("PUT /" + container + "/*blockid=*", request)) {
// Put Block (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block)
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String blockId = params.get("blockid");
blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /" + container + "/*comp=blocklist*", request)) {
// Put Block List (https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list)
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
.filter(line -> line.contains("</Latest>"))
.map(line -> line.substring(0, line.indexOf("</Latest>")))
.collect(Collectors.toList());
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
for (String blockId : blockIds) {
BytesReference block = blobs.remove(blockId);
assert block != null;
block.writeTo(blob);
}
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("PUT /" + container + "/*", request)) {
// PUT Blob (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob)
final String ifNoneMatch = exchange.getResponseHeaders().getFirst("If-None-Match");
if ("*".equals(ifNoneMatch)) {
if (blobs.putIfAbsent(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())) != null) {
sendError(exchange, RestStatus.CONFLICT);
return;
}
} else {
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
}
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
} else if (Regex.simpleMatch("HEAD /" + container + "/*", request)) {
// Get Blob Properties (see https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties)
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
if (blob == null) {
sendError(exchange, RestStatus.NOT_FOUND);
return;
}
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length()));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else if (Regex.simpleMatch("GET /" + container + "/*", request)) {
// GET Object (https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html)
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
if (blob == null) {
sendError(exchange, RestStatus.NOT_FOUND);
return;
}
// see Constants.HeaderConstants.STORAGE_RANGE_HEADER
final String range = exchange.getRequestHeaders().getFirst("x-ms-range");
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range);
if (matcher.matches() == false) {
throw new AssertionError("Range header does not match expected format: " + range);
}
final int start = Integer.parseInt(matcher.group(1));
final int length = Integer.parseInt(matcher.group(2)) - start + 1;
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length);
} else if (Regex.simpleMatch("DELETE /" + container + "/*", request)) {
// Delete Blob (https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob)
try (InputStream is = exchange.getRequestBody()) {
while (is.read() >= 0);
}
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
} else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) {
// List Blobs (https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs)
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final StringBuilder list = new StringBuilder();
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
list.append("<EnumerationResults>");
final String prefix = params.get("prefix");
list.append("<Blobs>");
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) {
list.append("<Blob><Name>").append(blob.getKey().replace("/" + container + "/", "")).append("</Name>");
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
}
}
list.append("</Blobs>");
list.append("</EnumerationResults>");
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
sendError(exchange, RestStatus.BAD_REQUEST);
}
} finally {
exchange.close();
}
}
public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
final Headers headers = exchange.getResponseHeaders();
headers.add("Content-Type", "application/xml");
// see Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER
final String requestId = exchange.getRequestHeaders().getFirst("x-ms-client-request-id");
if (requestId != null) {
// see Constants.HeaderConstants.STORAGE_RANGE_HEADER
headers.add("x-ms-request-id", requestId);
}
final String errorCode = toAzureErrorCode(status);
if (errorCode != null) {
// see Constants.HeaderConstants.ERROR_CODE
headers.add("x-ms-error-code", errorCode);
}
if (errorCode == null || "HEAD".equals(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(status.getStatus(), -1L);
} else {
final byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error><Code>" + errorCode + "</Code><Message>"
+ status + "</Message></Error>").getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(status.getStatus(), response.length);
exchange.getResponseBody().write(response);
}
}
// See https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes
private static String toAzureErrorCode(final RestStatus status) {
assert status.getStatus() >= 400;
switch (status) {
case BAD_REQUEST:
return "InvalidMetadata";
case NOT_FOUND:
return "BlobNotFound";
case INTERNAL_SERVER_ERROR:
return "InternalError";
case SERVICE_UNAVAILABLE:
return "ServerBusy";
case CONFLICT:
return "BlobAlreadyExists";
default:
throw new IllegalArgumentException("Error code [" + status.getStatus() + "] is not mapped to an existing Azure code");
}
}
}