[Tests] Mutualize fixtures code in BaseHttpFixture (#31210)

Many fixtures have similar code for writing the pid & ports files or
for handling HTTP requests. This commit adds an AbstractHttpFixture 
class in the test framework that can be extended for specific testing purposes.
This commit is contained in:
Tanguy Leroux 2018-06-14 14:09:56 +02:00 committed by GitHub
parent ce245a7320
commit bbfe1eccc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1656 additions and 2130 deletions

View File

@ -23,12 +23,6 @@ esplugin {
classname 'org.elasticsearch.plugin.repository.url.URLRepositoryPlugin'
}
forbiddenApisTest {
// we are using jdk-internal instead of jdk-non-portable to allow for com.sun.net.httpserver.* usage
bundledSignatures -= 'jdk-non-portable'
bundledSignatures += 'jdk-internal'
}
// This directory is shared between two URL repositories and one FS repository in YAML integration tests
File repositoryDir = new File(project.buildDir, "shared-repository")

View File

@ -18,151 +18,71 @@
*/
package org.elasticsearch.repositories.url;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
/**
* This {@link URLFixture} exposes a filesystem directory over HTTP. It is used in repository-url
* integration tests to expose a directory created by a regular FS repository.
*/
public class URLFixture {
public class URLFixture extends AbstractHttpFixture {
private final Path repositoryDir;
/**
* Creates a {@link URLFixture}
*/
private URLFixture(final String workingDir, final String repositoryDir) {
super(workingDir);
this.repositoryDir = dir(repositoryDir);
}
public static void main(String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("URLFixture <working directory> <repository directory>");
}
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
final HttpServer httpServer = MockHttpServer.createHttp(socketAddress, 0);
final URLFixture fixture = new URLFixture(args[0], args[1]);
fixture.listen();
}
try {
final Path workingDirectory = dir(args[0]);
/// Writes the PID of the current Java process in a `pid` file located in the working directory
writeFile(workingDirectory, "pid", ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
@Override
protected AbstractHttpFixture.Response handle(final Request request) throws IOException {
if ("GET".equalsIgnoreCase(request.getMethod())) {
String path = request.getPath();
if (path.length() > 0 && path.charAt(0) == '/') {
path = path.substring(1);
}
final String addressAndPort = addressToString(httpServer.getAddress());
// Writes the address and port of the http server in a `ports` file located in the working directory
writeFile(workingDirectory, "ports", addressAndPort);
Path normalizedRepositoryDir = repositoryDir.normalize();
Path normalizedPath = normalizedRepositoryDir.resolve(path).normalize();
// Exposes the repository over HTTP
httpServer.createContext("/", new ResponseHandler(dir(args[1])));
httpServer.start();
// Wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
httpServer.stop(0);
if (normalizedPath.startsWith(normalizedRepositoryDir)) {
if (Files.exists(normalizedPath) && Files.isReadable(normalizedPath) && Files.isRegularFile(normalizedPath)) {
byte[] content = Files.readAllBytes(normalizedPath);
final Map<String, String> headers = new HashMap<>(contentType("application/octet-stream"));
headers.put("Content-Length", String.valueOf(content.length));
return new Response(RestStatus.OK.getStatus(), headers, content);
} else {
return new Response(RestStatus.NOT_FOUND.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
} else {
return new Response(RestStatus.FORBIDDEN.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
}
return null;
}
@SuppressForbidden(reason = "Paths#get is fine - we don't have environment here")
private static Path dir(final String dir) {
return Paths.get(dir);
}
private static void writeFile(final Path dir, final String fileName, final String content) throws IOException {
final Path tempPidFile = Files.createTempFile(dir, null, null);
Files.write(tempPidFile, singleton(content));
Files.move(tempPidFile, dir.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
}
private static String addressToString(final SocketAddress address) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
if (inetSocketAddress.getAddress() instanceof Inet6Address) {
return "[" + inetSocketAddress.getHostString() + "]:" + inetSocketAddress.getPort();
} else {
return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
}
}
static class ResponseHandler implements HttpHandler {
private final Path repositoryDir;
ResponseHandler(final Path repositoryDir) {
this.repositoryDir = repositoryDir;
}
@Override
public void handle(HttpExchange exchange) throws IOException {
Response response;
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
if (userAgent != null && userAgent.startsWith("Apache Ant")) {
// This is a request made by the AntFixture, just reply "OK"
response = new Response(RestStatus.OK, emptyMap(), "text/plain; charset=utf-8", "OK".getBytes(UTF_8));
} else if ("GET".equalsIgnoreCase(exchange.getRequestMethod())) {
String path = exchange.getRequestURI().toString();
if (path.length() > 0 && path.charAt(0) == '/') {
path = path.substring(1);
}
Path normalizedRepositoryDir = repositoryDir.normalize();
Path normalizedPath = normalizedRepositoryDir.resolve(path).normalize();
if (normalizedPath.startsWith(normalizedRepositoryDir)) {
if (Files.exists(normalizedPath) && Files.isReadable(normalizedPath) && Files.isRegularFile(normalizedPath)) {
byte[] content = Files.readAllBytes(normalizedPath);
Map<String, String> headers = singletonMap("Content-Length", String.valueOf(content.length));
response = new Response(RestStatus.OK, headers, "application/octet-stream", content);
} else {
response = new Response(RestStatus.NOT_FOUND, emptyMap(), "text/plain; charset=utf-8", new byte[0]);
}
} else {
response = new Response(RestStatus.FORBIDDEN, emptyMap(), "text/plain; charset=utf-8", new byte[0]);
}
} else {
response = new Response(RestStatus.INTERNAL_SERVER_ERROR, emptyMap(), "text/plain; charset=utf-8",
"Unsupported HTTP method".getBytes(StandardCharsets.UTF_8));
}
exchange.sendResponseHeaders(response.status.getStatus(), response.body.length);
if (response.body.length > 0) {
exchange.getResponseBody().write(response.body);
}
exchange.close();
}
}
/**
* Represents a HTTP Response.
*/
static class Response {
final RestStatus status;
final Map<String, String> headers;
final String contentType;
final byte[] body;
Response(final RestStatus status, final Map<String, String> headers, final String contentType, final byte[] body) {
this.status = Objects.requireNonNull(status);
this.headers = Objects.requireNonNull(headers);
this.contentType = Objects.requireNonNull(contentType);
this.body = Objects.requireNonNull(body);
}
}
}

View File

@ -28,20 +28,12 @@ esplugin {
// No unit tests in this example
test.enabled = false
configurations {
exampleFixture
}
dependencies {
exampleFixture project(':test:fixtures:example-fixture')
}
task exampleFixture(type: org.elasticsearch.gradle.test.AntFixture) {
dependsOn project.configurations.exampleFixture
dependsOn testClasses
executable = new File(project.runtimeJavaHome, 'bin/java')
args '-cp', "${ -> project.configurations.exampleFixture.asPath }",
'example.ExampleTestFixture',
baseDir
args '-cp', "${ -> project.sourceSets.test.runtimeClasspath.asPath }",
'org.elasticsearch.example.resthandler.ExampleFixture',
baseDir, 'TEST'
}
integTestCluster {

View File

@ -0,0 +1,53 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.example.resthandler;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import java.io.IOException;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
public class ExampleFixture extends AbstractHttpFixture {
private final String message;
private ExampleFixture(final String workingDir, final String message) {
super(workingDir);
this.message = Objects.requireNonNull(message);
}
@Override
protected Response handle(final Request request) throws IOException {
if ("GET".equals(request.getMethod()) && "/".equals(request.getPath())) {
return new Response(200, TEXT_PLAIN_CONTENT_TYPE, message.getBytes(UTF_8));
}
return null;
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("ExampleFixture <working directory> <echo message>");
}
final ExampleFixture fixture = new ExampleFixture(args[0], args[1]);
fixture.listen();
}
}

View File

@ -23,20 +23,10 @@ import org.elasticsearch.gradle.test.AntFixture
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
dependencies {
testCompile project(path: ':plugins:repository-azure', configuration: 'runtime')
}
integTestCluster {
plugin ':plugins:repository-azure'
}
forbiddenApisTest {
// we are using jdk-internal instead of jdk-non-portable to allow for com.sun.net.httpserver.* usage
bundledSignatures -= 'jdk-non-portable'
bundledSignatures += 'jdk-internal'
}
boolean useFixture = false
String azureAccount = System.getenv("azure_storage_account")
@ -54,7 +44,7 @@ if (!azureAccount && !azureKey && !azureContainer && !azureBasePath) {
/** A task to start the fixture which emulates an Azure Storage service **/
task azureStorageFixture(type: AntFixture) {
dependsOn compileTestJava
dependsOn testClasses
env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }"
executable = new File(project.runtimeJavaHome, 'bin/java')
args 'org.elasticsearch.repositories.azure.AzureStorageFixture', baseDir, azureContainer
@ -64,6 +54,7 @@ Map<String, Object> expansions = [
'container': azureContainer,
'base_path': azureBasePath
]
processTestResources {
inputs.properties(expansions)
MavenFilteringHack.filter(it, expansions)

View File

@ -18,132 +18,332 @@
*/
package org.elasticsearch.repositories.azure;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.azure.AzureStorageTestServer.Response;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
/**
* {@link AzureStorageFixture} is a fixture that emulates an Azure Storage service.
* {@link AzureStorageFixture} emulates an Azure Storage service.
* <p>
* It starts an asynchronous socket server that binds to a random local port. The server parses
* HTTP requests and uses a {@link AzureStorageTestServer} to handle them before returning
* them to the client as HTTP responses.
* The implementation is based on official documentation available at
* https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api.
*/
public class AzureStorageFixture {
public class AzureStorageFixture extends AbstractHttpFixture {
public static void main(String[] args) throws Exception {
/**
* List of the containers stored on this test server
**/
private final Map<String, Container> containers = ConcurrentCollections.newConcurrentMap();
/**
* Request handlers for the requests made by the Azure client
**/
private final PathTrie<RequestHandler> handlers;
/**
* Creates a {@link AzureStorageFixture} with a custom endpoint
*/
private AzureStorageFixture(final String workingDir, final String container) {
super(workingDir);
this.containers.put(container, new Container(container));
this.handlers = defaultHandlers(containers);
}
@Override
protected AbstractHttpFixture.Response handle(final Request request) throws IOException {
final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters());
if (handler != null) {
final String authorization = request.getHeader("Authorization");
if (authorization == null
|| (authorization.length() > 0 && authorization.contains("azure_integration_test_account") == false)) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Access Denied");
}
return handler.handle(request);
}
return null;
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("AzureStorageFixture <working directory> <container>");
}
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
final HttpServer httpServer = MockHttpServer.createHttp(socketAddress, 0);
try {
final Path workingDirectory = workingDir(args[0]);
/// Writes the PID of the current Java process in a `pid` file located in the working directory
writeFile(workingDirectory, "pid", ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
final String addressAndPort = addressToString(httpServer.getAddress());
// Writes the address and port of the http server in a `ports` file located in the working directory
writeFile(workingDirectory, "ports", addressAndPort);
// Emulates Azure
final String storageUrl = "http://" + addressAndPort;
final AzureStorageTestServer testServer = new AzureStorageTestServer(storageUrl);
testServer.createContainer(args[1]);
httpServer.createContext("/", new ResponseHandler(testServer));
httpServer.start();
// Wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
httpServer.stop(0);
}
final AzureStorageFixture fixture = new AzureStorageFixture(args[0], args[1]);
fixture.listen();
}
@SuppressForbidden(reason = "Paths#get is fine - we don't have environment here")
private static Path workingDir(final String dir) {
return Paths.get(dir);
}
/**
* Builds the default request handlers
**/
private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Container> containers) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
private static void writeFile(final Path dir, final String fileName, final String content) throws IOException {
final Path tempPidFile = Files.createTempFile(dir, null, null);
Files.write(tempPidFile, singleton(content));
Files.move(tempPidFile, dir.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
}
// Get Blob Properties
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
objectsPaths("HEAD /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String containerName = request.getParam("container");
private static String addressToString(final SocketAddress address) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
if (inetSocketAddress.getAddress() instanceof Inet6Address) {
return "[" + inetSocketAddress.getHostString() + "]:" + inetSocketAddress.getPort();
} else {
return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
}
}
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
static class ResponseHandler implements HttpHandler {
final String blobName = objectName(request.getParameters());
for (Map.Entry<String, byte[]> object : container.objects.entrySet()) {
if (object.getKey().equals(blobName)) {
Map<String, String> responseHeaders = new HashMap<>();
responseHeaders.put("x-ms-blob-content-length", String.valueOf(object.getValue().length));
responseHeaders.put("x-ms-blob-type", "blockblob");
return new Response(RestStatus.OK.getStatus(), responseHeaders, EMPTY_BYTE);
}
}
return newBlobNotFoundError(request.getId());
})
);
private final AzureStorageTestServer server;
// PUT Blob
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob
objectsPaths("PUT /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String destContainerName = request.getParam("container");
final String destBlobName = objectName(request.getParameters());
private ResponseHandler(final AzureStorageTestServer server) {
this.server = server;
}
final Container destContainer = containers.get(destContainerName);
if (destContainer == null) {
return newContainerNotFoundError(request.getId());
}
@Override
public void handle(HttpExchange exchange) throws IOException {
String method = exchange.getRequestMethod();
String path = server.getEndpoint() + exchange.getRequestURI().getRawPath();
String query = exchange.getRequestURI().getRawQuery();
Map<String, List<String>> headers = exchange.getRequestHeaders();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(exchange.getRequestBody(), out);
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, request.getBody());
if (existingBytes != null) {
return newBlobAlreadyExistsError(request.getId());
}
Response response = null;
return new Response(RestStatus.CREATED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); })
);
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
if (userAgent != null && userAgent.startsWith("Apache Ant")) {
// This is a request made by the AntFixture, just reply "OK"
response = new Response(RestStatus.OK, emptyMap(), "text/plain; charset=utf-8", "OK".getBytes(UTF_8));
// GET Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
objectsPaths("GET /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String containerName = request.getParam("container");
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
final String blobName = objectName(request.getParameters());
if (container.objects.containsKey(blobName)) {
Map<String, String> responseHeaders = new HashMap<>(contentType("application/octet-stream"));
responseHeaders.put("x-ms-copy-status", "success");
responseHeaders.put("x-ms-blob-type", "blockblob");
return new Response(RestStatus.OK.getStatus(), responseHeaders, container.objects.get(blobName));
}
return newBlobNotFoundError(request.getId());
})
);
// Delete Blob
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob
objectsPaths("DELETE /{container}").forEach(path ->
handlers.insert(path, (request) -> {
final String containerName = request.getParam("container");
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
final String blobName = objectName(request.getParameters());
if (container.objects.remove(blobName) != null) {
return new Response(RestStatus.ACCEPTED.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
return newBlobNotFoundError(request.getId());
})
);
// List Blobs
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs
handlers.insert("GET /{container}/", (request) -> {
final String containerName = request.getParam("container");
final Container container = containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(request.getId());
}
final String prefix = request.getParam("prefix");
return newEnumerationResultsResponse(request.getId(), container, prefix);
});
// Get Container Properties
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-container-properties
handlers.insert("HEAD /{container}", (request) -> {
String container = request.getParam("container");
if (Strings.hasText(container) && containers.containsKey(container)) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
} else {
// Otherwise simulate a S3 response
response = server.handle(method, path, query, headers, out.toByteArray());
return newContainerNotFoundError(request.getId());
}
});
Map<String, List<String>> responseHeaders = exchange.getResponseHeaders();
responseHeaders.put("Content-Type", singletonList(response.contentType));
response.headers.forEach((k, v) -> responseHeaders.put(k, singletonList(v)));
exchange.sendResponseHeaders(response.status.getStatus(), response.body.length);
if (response.body.length > 0) {
exchange.getResponseBody().write(response.body);
}
exchange.close();
return handlers;
}
/**
* Represents a Azure Storage container.
*/
static class Container {
/**
* Container name
**/
final String name;
/**
* Blobs contained in the container
**/
final Map<String, byte[]> objects;
Container(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
/**
* Decline a path like "http://host:port/{bucket}" into 10 derived paths like:
* - http://host:port/{bucket}/{path0}
* - http://host:port/{bucket}/{path0}/{path1}
* - http://host:port/{bucket}/{path0}/{path1}/{path2}
* - etc
*/
private static List<String> objectsPaths(final String path) {
final List<String> paths = new ArrayList<>();
String p = path;
for (int i = 0; i < 10; i++) {
p = p + "/{path" + i + "}";
paths.add(p);
}
return paths;
}
/**
* Retrieves the object name from all derived paths named {pathX} where 0 <= X < 10.
* <p>
* This is the counterpart of {@link #objectsPaths(String)}
*/
private static String objectName(final Map<String, String> params) {
final StringBuilder name = new StringBuilder();
for (int i = 0; i < 10; i++) {
String value = params.getOrDefault("path" + i, null);
if (value != null) {
if (name.length() > 0) {
name.append('/');
}
name.append(value);
}
}
return name.toString();
}
/**
* Azure EnumerationResults Response
*/
private static Response newEnumerationResultsResponse(final long requestId, final Container container, final String prefix) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"utf-8\"?>");
response.append("<EnumerationResults ServiceEndpoint=\"http://myaccount.blob.core.windows.net/\"");
response.append(" ContainerName=\"").append(container.name).append("\">");
if (prefix != null) {
response.append("<Prefix>").append(prefix).append("</Prefix>");
} else {
response.append("<Prefix/>");
}
response.append("<MaxResults>").append(container.objects.size()).append("</MaxResults>");
response.append("<Blobs>");
int count = 0;
for (Map.Entry<String, byte[]> object : container.objects.entrySet()) {
String objectName = object.getKey();
if (prefix == null || objectName.startsWith(prefix)) {
response.append("<Blob>");
response.append("<Name>").append(objectName).append("</Name>");
response.append("<Properties>");
response.append("<Content-Length>").append(object.getValue().length).append("</Content-Length>");
response.append("<CopyId>").append(count++).append("</CopyId>");
response.append("<CopyStatus>success</CopyStatus>");
response.append("<BlobType>BlockBlob</BlobType>");
response.append("</Properties>");
response.append("</Blob>");
}
}
response.append("</Blobs>");
response.append("<NextMarker />");
response.append("</EnumerationResults>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-ms-request-id", id);
return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8));
}
private static Response newContainerNotFoundError(final long requestId) {
return newError(requestId, RestStatus.NOT_FOUND, "ContainerNotFound", "The specified container does not exist");
}
private static Response newBlobNotFoundError(final long requestId) {
return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist");
}
private static Response newBlobAlreadyExistsError(final long requestId) {
return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists");
}
/**
* Azure Error
* <p>
* https://docs.microsoft.com/en-us/rest/api/storageservices/status-and-error-codes2
*/
private static Response newError(final long requestId,
final RestStatus status,
final String code,
final String message) {
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<Error>");
response.append("<Code>").append(code).append("</Code>");
response.append("<Message>").append(message).append("</Message>");
response.append("</Error>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-ms-request-id", String.valueOf(requestId));
headers.put("x-ms-error-code", code);
return new Response(status.getStatus(), headers, response.toString().getBytes(UTF_8));
}
}

View File

@ -1,402 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.azure;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
/**
* {@link AzureStorageTestServer} emulates an Azure Storage service through a {@link #handle(String, String, String, Map, byte[])}
* method that provides appropriate responses for specific requests like the real Azure platform would do.
* It is based on official documentation available at https://docs.microsoft.com/en-us/rest/api/storageservices/blob-service-rest-api.
*/
public class AzureStorageTestServer {
private static byte[] EMPTY_BYTE = new byte[0];
/** List of the containers stored on this test server **/
private final Map<String, Container> containers = ConcurrentCollections.newConcurrentMap();
/** Request handlers for the requests made by the Azure client **/
private final PathTrie<RequestHandler> handlers;
/** Server endpoint **/
private final String endpoint;
/** Increments for the requests ids **/
private final AtomicLong requests = new AtomicLong(0);
/**
* Creates a {@link AzureStorageTestServer} with a custom endpoint
*/
AzureStorageTestServer(final String endpoint) {
this.endpoint = Objects.requireNonNull(endpoint, "endpoint must not be null");
this.handlers = defaultHandlers(endpoint, containers);
}
/** Creates a container in the test server **/
void createContainer(final String containerName) {
containers.put(containerName, new Container(containerName));
}
public String getEndpoint() {
return endpoint;
}
/**
* Returns a response for the given request
*
* @param method the HTTP method of the request
* @param path the path of the URL of the request
* @param query the queryString of the URL of request
* @param headers the HTTP headers of the request
* @param body the HTTP request body
* @return a {@link Response}
* @throws IOException if something goes wrong
*/
public Response handle(final String method,
final String path,
final String query,
final Map<String, List<String>> headers,
byte[] body) throws IOException {
final long requestId = requests.incrementAndGet();
final Map<String, String> params = new HashMap<>();
if (query != null) {
RestUtils.decodeQueryString(query, 0, params);
}
final RequestHandler handler = handlers.retrieve(method + " " + path, params);
if (handler != null) {
return handler.execute(params, headers, body, requestId);
} else {
return newInternalError(requestId);
}
}
@FunctionalInterface
interface RequestHandler {
/**
* Simulates the execution of a Azure Storage request and returns a corresponding response.
*
* @param params the request's query string parameters
* @param headers the request's headers
* @param body the request body provided as a byte array
* @param requestId a unique id for the incoming request
* @return the corresponding response
*
* @throws IOException if something goes wrong
*/
Response execute(Map<String, String> params, Map<String, List<String>> headers, byte[] body, long requestId) throws IOException;
}
/** Builds the default request handlers **/
private static PathTrie<RequestHandler> defaultHandlers(final String endpoint, final Map<String, Container> containers) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
// Get Blob Properties
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties
objectsPaths("HEAD " + endpoint + "/{container}").forEach(path ->
handlers.insert(path, (params, headers, body, requestId) -> {
final String containerName = params.get("container");
final Container container =containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(requestId);
}
final String blobName = objectName(params);
for (Map.Entry<String, byte[]> object : container.objects.entrySet()) {
if (object.getKey().equals(blobName)) {
Map<String, String> responseHeaders = new HashMap<>();
responseHeaders.put("x-ms-blob-content-length", String.valueOf(object.getValue().length));
responseHeaders.put("x-ms-blob-type", "blockblob");
return new Response(RestStatus.OK, responseHeaders, "text/plain", EMPTY_BYTE);
}
}
return newBlobNotFoundError(requestId);
})
);
// PUT Blob
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob
objectsPaths("PUT " + endpoint + "/{container}").forEach(path ->
handlers.insert(path, (params, headers, body, requestId) -> {
final String destContainerName = params.get("container");
final String destBlobName = objectName(params);
final Container destContainer =containers.get(destContainerName);
if (destContainer == null) {
return newContainerNotFoundError(requestId);
}
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, body);
if (existingBytes != null) {
return newBlobAlreadyExistsError(requestId);
}
return new Response(RestStatus.CREATED, emptyMap(), "text/plain", EMPTY_BYTE);
})
);
// GET Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
objectsPaths("GET " + endpoint + "/{container}").forEach(path ->
handlers.insert(path, (params, headers, body, requestId) -> {
final String containerName = params.get("container");
final Container container =containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(requestId);
}
final String blobName = objectName(params);
if (container.objects.containsKey(blobName)) {
Map<String, String> responseHeaders = new HashMap<>();
responseHeaders.put("x-ms-copy-status", "success");
responseHeaders.put("x-ms-blob-type", "blockblob");
return new Response(RestStatus.OK, responseHeaders, "application/octet-stream", container.objects.get(blobName));
}
return newBlobNotFoundError(requestId);
})
);
// Delete Blob
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob
objectsPaths("DELETE " + endpoint + "/{container}").forEach(path ->
handlers.insert(path, (params, headers, body, requestId) -> {
final String containerName = params.get("container");
final Container container =containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(requestId);
}
final String blobName = objectName(params);
if (container.objects.remove(blobName) != null) {
return new Response(RestStatus.ACCEPTED, emptyMap(), "text/plain", EMPTY_BYTE);
}
return newBlobNotFoundError(requestId);
})
);
// List Blobs
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs
handlers.insert("GET " + endpoint + "/{container}/", (params, headers, body, requestId) -> {
final String containerName = params.get("container");
final Container container =containers.get(containerName);
if (container == null) {
return newContainerNotFoundError(requestId);
}
final String prefix = params.get("prefix");
return newEnumerationResultsResponse(requestId, container, prefix);
});
// Get Container Properties
//
// https://docs.microsoft.com/en-us/rest/api/storageservices/get-container-properties
handlers.insert("HEAD " + endpoint + "/{container}", (params, headers, body, requestId) -> {
String container = params.get("container");
if (Strings.hasText(container) && containers.containsKey(container)) {
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
} else {
return newContainerNotFoundError(requestId);
}
});
return handlers;
}
/**
* Represents a Azure Storage container.
*/
static class Container {
/** Container name **/
final String name;
/** Blobs contained in the container **/
final Map<String, byte[]> objects;
Container(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
/**
* Represents a HTTP Response.
*/
static class Response {
final RestStatus status;
final Map<String, String> headers;
final String contentType;
final byte[] body;
Response(final RestStatus status, final Map<String, String> headers, final String contentType, final byte[] body) {
this.status = Objects.requireNonNull(status);
this.headers = Objects.requireNonNull(headers);
this.contentType = Objects.requireNonNull(contentType);
this.body = Objects.requireNonNull(body);
}
}
/**
* Decline a path like "http://host:port/{bucket}" into 10 derived paths like:
* - http://host:port/{bucket}/{path0}
* - http://host:port/{bucket}/{path0}/{path1}
* - http://host:port/{bucket}/{path0}/{path1}/{path2}
* - etc
*/
private static List<String> objectsPaths(final String path) {
final List<String> paths = new ArrayList<>();
String p = path;
for (int i = 0; i < 10; i++) {
p = p + "/{path" + i + "}";
paths.add(p);
}
return paths;
}
/**
* Retrieves the object name from all derived paths named {pathX} where 0 <= X < 10.
*
* This is the counterpart of {@link #objectsPaths(String)}
*/
private static String objectName(final Map<String, String> params) {
final StringBuilder name = new StringBuilder();
for (int i = 0; i < 10; i++) {
String value = params.getOrDefault("path" + i, null);
if (value != null) {
if (name.length() > 0) {
name.append('/');
}
name.append(value);
}
}
return name.toString();
}
/**
* Azure EnumerationResults Response
*/
private static Response newEnumerationResultsResponse(final long requestId, final Container container, final String prefix) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"utf-8\"?>");
response.append("<EnumerationResults ServiceEndpoint=\"http://myaccount.blob.core.windows.net/\"");
response.append(" ContainerName=\"").append(container.name).append("\">");
if (prefix != null) {
response.append("<Prefix>").append(prefix).append("</Prefix>");
} else {
response.append("<Prefix/>");
}
response.append("<MaxResults>").append(container.objects.size()).append("</MaxResults>");
response.append("<Blobs>");
int count = 0;
for (Map.Entry<String, byte[]> object : container.objects.entrySet()) {
String objectName = object.getKey();
if (prefix == null || objectName.startsWith(prefix)) {
response.append("<Blob>");
response.append("<Name>").append(objectName).append("</Name>");
response.append("<Properties>");
response.append("<Content-Length>").append(object.getValue().length).append("</Content-Length>");
response.append("<CopyId>").append(count++).append("</CopyId>");
response.append("<CopyStatus>success</CopyStatus>");
response.append("<BlobType>BlockBlob</BlobType>");
response.append("</Properties>");
response.append("</Blob>");
}
}
response.append("</Blobs>");
response.append("<NextMarker />");
response.append("</EnumerationResults>");
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
}
private static Response newContainerNotFoundError(final long requestId) {
return newError(requestId, RestStatus.NOT_FOUND, "ContainerNotFound", "The specified container does not exist");
}
private static Response newBlobNotFoundError(final long requestId) {
return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist");
}
private static Response newBlobAlreadyExistsError(final long requestId) {
return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists");
}
private static Response newInternalError(final long requestId) {
return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "The server encountered an internal error");
}
/**
* Azure Error
*
* https://docs.microsoft.com/en-us/rest/api/storageservices/status-and-error-codes2
*/
private static Response newError(final long requestId,
final RestStatus status,
final String code,
final String message) {
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<Error>");
response.append("<Code>").append(code).append("</Code>");
response.append("<Message>").append(message).append("</Message>");
response.append("</Error>");
final Map<String, String> headers = new HashMap<>(2);
headers.put("x-ms-request-id", String.valueOf(requestId));
headers.put("x-ms-error-code", code);
return new Response(status, headers, "application/xml", response.toString().getBytes(UTF_8));
}
}

View File

@ -26,20 +26,10 @@ import java.security.KeyPairGenerator
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
dependencies {
testCompile project(path: ':plugins:repository-gcs', configuration: 'runtime')
}
integTestCluster {
plugin ':plugins:repository-gcs'
}
forbiddenApisTest {
// we are using jdk-internal instead of jdk-non-portable to allow for com.sun.net.httpserver.* usage
bundledSignatures -= 'jdk-non-portable'
bundledSignatures += 'jdk-internal'
}
boolean useFixture = false
String gcsServiceAccount = System.getenv("google_storage_service_account")
@ -61,7 +51,7 @@ if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) {
/** A task to start the GoogleCloudStorageFixture which emulates a Google Cloud Storage service **/
task googleCloudStorageFixture(type: AntFixture) {
dependsOn compileTestJava
dependsOn testClasses
env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }"
executable = new File(project.runtimeJavaHome, 'bin/java')
args 'org.elasticsearch.repositories.gcs.GoogleCloudStorageFixture', baseDir, 'bucket_test'

View File

@ -18,133 +18,591 @@
*/
package org.elasticsearch.repositories.gcs;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.gcs.GoogleCloudStorageTestServer.Response;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* {@link GoogleCloudStorageFixture} is a fixture that emulates a Google Cloud Storage service.
* <p>
* It starts an asynchronous socket server that binds to a random local port. The server parses
* HTTP requests and uses a {@link GoogleCloudStorageTestServer} to handle them before returning
* them to the client as HTTP responses.
* {@link GoogleCloudStorageFixture} emulates a Google Cloud Storage service.
*
* The implementation is based on official documentation available at https://cloud.google.com/storage/docs/json_api/v1/.
*/
public class GoogleCloudStorageFixture {
public class GoogleCloudStorageFixture extends AbstractHttpFixture {
public static void main(String[] args) throws Exception {
/** List of the buckets stored on this test server **/
private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
/** Request handlers for the requests made by the Google Cloud Storage client **/
private final PathTrie<RequestHandler> handlers;
/**
* Creates a {@link GoogleCloudStorageFixture}
*/
private GoogleCloudStorageFixture(final String workingDir, final String bucket) {
super(workingDir);
this.buckets.put(bucket, new Bucket(bucket));
this.handlers = defaultHandlers(buckets);
}
@Override
protected Response handle(final Request request) throws IOException {
final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters());
if (handler != null) {
return handler.handle(request);
}
return null;
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("GoogleCloudStorageFixture <working directory> <bucket>");
}
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
final HttpServer httpServer = MockHttpServer.createHttp(socketAddress, 0);
try {
final Path workingDirectory = workingDir(args[0]);
/// Writes the PID of the current Java process in a `pid` file located in the working directory
writeFile(workingDirectory, "pid", ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
final String addressAndPort = addressToString(httpServer.getAddress());
// Writes the address and port of the http server in a `ports` file located in the working directory
writeFile(workingDirectory, "ports", addressAndPort);
// Emulates a Google Cloud Storage server
final String storageUrl = "http://" + addressAndPort;
final GoogleCloudStorageTestServer storageTestServer = new GoogleCloudStorageTestServer(storageUrl);
storageTestServer.createBucket(args[1]);
httpServer.createContext("/", new ResponseHandler(storageTestServer));
httpServer.start();
// Wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
httpServer.stop(0);
}
final GoogleCloudStorageFixture fixture = new GoogleCloudStorageFixture(args[0], args[1]);
fixture.listen();
}
@SuppressForbidden(reason = "Paths#get is fine - we don't have environment here")
private static Path workingDir(final String dir) {
return Paths.get(dir);
}
/** Builds the default request handlers **/
private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> buckets) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
private static void writeFile(final Path dir, final String fileName, final String content) throws IOException {
final Path tempPidFile = Files.createTempFile(dir, null, null);
Files.write(tempPidFile, singleton(content));
Files.move(tempPidFile, dir.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
}
// GET Bucket
//
// https://cloud.google.com/storage/docs/json_api/v1/buckets/get
handlers.insert("GET /storage/v1/b/{bucket}", (request) -> {
final String name = request.getParam("bucket");
if (Strings.hasText(name) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "bucket name is missing");
}
private static String addressToString(final SocketAddress address) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
if (inetSocketAddress.getAddress() instanceof Inet6Address) {
return "[" + inetSocketAddress.getHostString() + "]:" + inetSocketAddress.getPort();
} else {
return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
}
}
static class ResponseHandler implements HttpHandler {
private final GoogleCloudStorageTestServer storageServer;
private ResponseHandler(final GoogleCloudStorageTestServer storageServer) {
this.storageServer = storageServer;
}
@Override
public void handle(HttpExchange exchange) throws IOException {
String method = exchange.getRequestMethod();
String path = storageServer.getEndpoint() + exchange.getRequestURI().getRawPath();
String query = exchange.getRequestURI().getRawQuery();
Map<String, List<String>> headers = exchange.getRequestHeaders();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(exchange.getRequestBody(), out);
Response storageResponse = null;
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
if (userAgent != null && userAgent.startsWith("Apache Ant")) {
// This is a request made by the AntFixture, just reply "OK"
storageResponse = new Response(RestStatus.OK, emptyMap(), "text/plain; charset=utf-8", "OK".getBytes(UTF_8));
if (buckets.containsKey(name)) {
return newResponse(RestStatus.OK, emptyMap(), buildBucketResource(name));
} else {
// Otherwise simulate a S3 response
storageResponse = storageServer.handle(method, path, query, headers, out.toByteArray());
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
});
// GET Object
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
handlers.insert("GET /storage/v1/b/{bucket}/o/{object}", (request) -> {
final String objectName = request.getParam("object");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
Map<String, List<String>> responseHeaders = exchange.getResponseHeaders();
responseHeaders.put("Content-Type", singletonList(storageResponse.contentType));
storageResponse.headers.forEach((k, v) -> responseHeaders.put(k, singletonList(v)));
exchange.sendResponseHeaders(storageResponse.status.getStatus(), storageResponse.body.length);
if (storageResponse.body.length > 0) {
exchange.getResponseBody().write(storageResponse.body);
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
exchange.close();
for (final Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
if (object.getKey().equals(objectName)) {
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, objectName, object.getValue()));
}
}
return newError(RestStatus.NOT_FOUND, "object not found");
});
// Delete Object
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/delete
handlers.insert("DELETE /storage/v1/b/{bucket}/o/{object}", (request) -> {
final String objectName = request.getParam("object");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final byte[] bytes = bucket.objects.remove(objectName);
if (bytes != null) {
return new Response(RestStatus.NO_CONTENT.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
return newError(RestStatus.NOT_FOUND, "object not found");
});
// Insert Object (initialization)
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
handlers.insert("POST /upload/storage/v1/b/{bucket}/o", (request) -> {
final String ifGenerationMatch = request.getParam("ifGenerationMatch");
if ("0".equals(ifGenerationMatch) == false) {
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
}
final String uploadType = request.getParam("uploadType");
if ("resumable".equals(uploadType)) {
final String objectName = request.getParam("name");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
} else {
return newError(RestStatus.CONFLICT, "object already exist");
}
} else if ("multipart".equals(uploadType)) {
/*
* A multipart/related request body looks like this (note the binary dump inside a text blob! nice!):
* --__END_OF_PART__
* Content-Length: 135
* Content-Type: application/json; charset=UTF-8
* content-transfer-encoding: binary
*
* {"bucket":"bucket_test","crc32c":"7XacHQ==","md5Hash":"fVztGkklMlUamsSmJK7W+w==",
* "name":"tests-KEwE3bU4TuyetBgQIghmUw/master.dat-temp"}
* --__END_OF_PART__
* content-transfer-encoding: binary
*
* KEwE3bU4TuyetBgQIghmUw
* --__END_OF_PART__--
*/
String boundary = "__END_OF_PART__";
// Determine the multipart boundary
final String contentType = request.getContentType();
if ((contentType != null) && contentType.contains("multipart/related; boundary=")) {
boundary = contentType.replace("multipart/related; boundary=", "");
}
InputStream inputStreamBody = new ByteArrayInputStream(request.getBody());
final String contentEncoding = request.getHeader("Content-Encoding");
if (contentEncoding != null) {
if ("gzip".equalsIgnoreCase(contentEncoding)) {
inputStreamBody = new GZIPInputStream(inputStreamBody);
}
}
// Read line by line ?both? parts of the multipart. Decoding headers as
// IS_8859_1 is safe.
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStreamBody, StandardCharsets.ISO_8859_1))) {
String line;
// read first part delimiter
line = reader.readLine();
if ((line == null) || (line.equals("--" + boundary) == false)) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Does not start with the part delimiter.");
}
final Map<String, List<String>> firstPartHeaders = new HashMap<>();
// Reads the first part's headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
firstPartHeaders.put(header[0], singletonList(header[1]));
}
}
final List<String> firstPartContentTypes = firstPartHeaders.getOrDefault("Content-Type",
firstPartHeaders.get("Content-type"));
if ((firstPartContentTypes == null)
|| (firstPartContentTypes.stream().noneMatch(x -> x.contains("application/json")))) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Metadata part expected to have the \"application/json\" content type.");
}
// read metadata part, a single line
line = reader.readLine();
final byte[] metadata = line.getBytes(StandardCharsets.ISO_8859_1);
if ((firstPartContentTypes != null) && (firstPartContentTypes.stream().anyMatch((x -> x.contains("charset=utf-8"))))) {
// decode as utf-8
line = new String(metadata, StandardCharsets.UTF_8);
}
final Matcher objectNameMatcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
objectNameMatcher.find();
final String objectName = objectNameMatcher.group(1);
final Matcher bucketNameMatcher = Pattern.compile("\"bucket\":\"([^\"]*)\"").matcher(line);
bucketNameMatcher.find();
final String bucketName = bucketNameMatcher.group(1);
// read second part delimiter
line = reader.readLine();
if ((line == null) || (line.equals("--" + boundary) == false)) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Second part does not start with delimiter. "
+ "Is the metadata multi-line?");
}
final Map<String, List<String>> secondPartHeaders = new HashMap<>();
// Reads the second part's headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
secondPartHeaders.put(header[0], singletonList(header[1]));
}
}
final List<String> secondPartTransferEncoding = secondPartHeaders.getOrDefault("Content-Transfer-Encoding",
secondPartHeaders.get("content-transfer-encoding"));
if ((secondPartTransferEncoding == null)
|| (secondPartTransferEncoding.stream().noneMatch(x -> x.contains("binary")))) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Data part expected to have the \"binary\" content transfer encoding.");
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
int c;
while ((c = reader.read()) != -1) {
// one char to one byte, because of the ISO_8859_1 encoding
baos.write(c);
}
final byte[] temp = baos.toByteArray();
final byte[] trailingEnding = ("\r\n--" + boundary + "--\r\n").getBytes(StandardCharsets.ISO_8859_1);
// check trailing
for (int i = trailingEnding.length - 1; i >= 0; i--) {
if (trailingEnding[i] != temp[(temp.length - trailingEnding.length) + i]) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "Error parsing multipart request.");
}
}
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final byte[] objectData = Arrays.copyOf(temp, temp.length - trailingEnding.length);
if ((objectName != null) && (bucketName != null) && (objectData != null)) {
bucket.objects.put(objectName, objectData);
return new Response(RestStatus.OK.getStatus(), JSON_CONTENT_TYPE, metadata);
} else {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "error parsing multipart request");
}
}
} else {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload type must be resumable or multipart");
}
});
// Insert Object (upload)
//
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
handlers.insert("PUT /upload/storage/v1/b/{bucket}/o", (request) -> {
final String objectId = request.getParam("upload_id");
if (Strings.hasText(objectId) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload id is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.containsKey(objectId) == false) {
return newError(RestStatus.NOT_FOUND, "object name not found");
}
bucket.objects.put(objectId, request.getBody());
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, objectId, request.getBody()));
});
// List Objects
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/list
handlers.insert("GET /storage/v1/b/{bucket}/o", (request) -> {
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.field("kind", "storage#objects");
{
builder.startArray("items");
final String prefixParam = request.getParam("prefix");
for (final Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
if ((prefixParam != null) && (object.getKey().startsWith(prefixParam) == false)) {
continue;
}
buildObjectResource(builder, bucket.name, object.getKey(), object.getValue());
}
builder.endArray();
}
builder.endObject();
return newResponse(RestStatus.OK, emptyMap(), builder);
});
// Download Object
//
// https://cloud.google.com/storage/docs/request-body
handlers.insert("GET /download/storage/v1/b/{bucket}/o/{object}", (request) -> {
final String object = request.getParam("object");
if (Strings.hasText(object) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object id is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.containsKey(object) == false) {
return newError(RestStatus.NOT_FOUND, "object name not found");
}
return new Response(RestStatus.OK.getStatus(), contentType("application/octet-stream"), bucket.objects.get(object));
});
// Batch
//
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
handlers.insert("POST /batch/storage/v1", (request) -> {
final List<Response> batchedResponses = new ArrayList<>();
// A batch request body looks like this:
//
// --__END_OF_PART__
// Content-Length: 71
// Content-Type: application/http
// content-id: 1
// content-transfer-encoding: binary
//
// DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/foo%2Ftest HTTP/1.1
//
//
// --__END_OF_PART__
// Content-Length: 71
// Content-Type: application/http
// content-id: 2
// content-transfer-encoding: binary
//
// DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/bar%2Ftest HTTP/1.1
//
//
// --__END_OF_PART__--
// Default multipart boundary
String boundary = "__END_OF_PART__";
// Determine the multipart boundary
final String contentType = request.getContentType();
if ((contentType != null) && contentType.contains("multipart/mixed; boundary=")) {
boundary = contentType.replace("multipart/mixed; boundary=", "");
}
long batchedRequests = 0L;
// Read line by line the batched requests
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(
new ByteArrayInputStream(request.getBody()), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
// Start of a batched request
if (line.equals("--" + boundary)) {
final Map<String, String> batchedHeaders = new HashMap<>();
// Reads the headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
batchedHeaders.put(header[0], header[1]);
}
}
// Reads the method and URL
line = reader.readLine();
final String batchedMethod = line.substring(0, line.indexOf(' '));
final URI batchedUri = URI.create(line.substring(batchedMethod.length() + 1, line.lastIndexOf(' ')));
// Reads the body
line = reader.readLine();
byte[] batchedBody = new byte[0];
if ((line != null) || (line.startsWith("--" + boundary) == false)) {
batchedBody = line.getBytes(StandardCharsets.UTF_8);
}
final Request batchedRequest = new Request(batchedRequests, batchedMethod, batchedUri, batchedHeaders, batchedBody);
batchedRequests = batchedRequests + 1;
// Executes the batched request
final RequestHandler handler =
handlers.retrieve(batchedRequest.getMethod() + " " + batchedRequest.getPath(), batchedRequest.getParameters());
if (handler != null) {
try {
batchedResponses.add(handler.handle(batchedRequest));
} catch (final IOException e) {
batchedResponses.add(newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
}
}
}
// Now we can build the response
final String sep = "--";
final String line = "\r\n";
final StringBuilder builder = new StringBuilder();
for (final Response response : batchedResponses) {
builder.append(sep).append(boundary).append(line);
builder.append("Content-Type: application/http").append(line);
builder.append(line);
builder.append("HTTP/1.1 ")
.append(response.getStatus())
.append(' ')
.append(RestStatus.fromCode(response.getStatus()).toString())
.append(line);
builder.append("Content-Length: ").append(response.getBody().length).append(line);
builder.append("Content-Type: ").append(response.getContentType()).append(line);
response.getHeaders().forEach((k, v) -> builder.append(k).append(": ").append(v).append(line));
builder.append(line);
builder.append(new String(response.getBody(), StandardCharsets.UTF_8)).append(line);
builder.append(line);
}
builder.append(line);
builder.append(sep).append(boundary).append(sep);
final byte[] content = builder.toString().getBytes(StandardCharsets.UTF_8);
return new Response(RestStatus.OK.getStatus(), contentType("multipart/mixed; boundary=" + boundary), content);
});
// Fake refresh of an OAuth2 token
//
handlers.insert("POST /o/oauth2/token", (request) ->
newResponse(RestStatus.OK, emptyMap(), jsonBuilder()
.startObject()
.field("access_token", "unknown")
.field("token_type", "Bearer")
.field("expires_in", 3600)
.endObject())
);
return handlers;
}
/**
* Represents a Storage bucket as if it was created on Google Cloud Storage.
*/
static class Bucket {
/** Bucket name **/
final String name;
/** Blobs contained in the bucket **/
final Map<String, byte[]> objects;
Bucket(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
/**
* Builds a JSON response
*/
private static Response newResponse(final RestStatus status, final Map<String, String> headers, final XContentBuilder xContentBuilder) {
final Map<String, String> responseHeaders = new HashMap<>(JSON_CONTENT_TYPE);
responseHeaders.putAll(headers);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BytesReference.bytes(xContentBuilder).writeTo(out);
return new Response(status.getStatus(), responseHeaders, out.toByteArray());
} catch (final IOException e) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
/**
* Storage Error JSON representation
*/
private static Response newError(final RestStatus status, final String message) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject()
.startObject("error")
.field("code", status.getStatus())
.field("message", message)
.startArray("errors")
.startObject()
.field("domain", "global")
.field("reason", status.toString())
.field("message", message)
.endObject()
.endArray()
.endObject()
.endObject();
BytesReference.bytes(builder).writeTo(out);
}
return new Response(status.getStatus(), JSON_CONTENT_TYPE, out.toByteArray());
} catch (final IOException e) {
final byte[] bytes = (message != null ? message : "something went wrong").getBytes(StandardCharsets.UTF_8);
return new Response(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), TEXT_PLAIN_CONTENT_TYPE, bytes);
}
}
/**
* Storage Bucket JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/bucket#resource
*/
private static XContentBuilder buildBucketResource(final String name) throws IOException {
return jsonBuilder().startObject()
.field("kind", "storage#bucket")
.field("name", name)
.field("id", name)
.endObject();
}
/**
* Storage Object JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/objects#resource
*/
private static XContentBuilder buildObjectResource(final String bucket, final String name, final byte[] bytes) throws IOException {
return buildObjectResource(jsonBuilder(), bucket, name, bytes);
}
/**
* Storage Object JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/objects#resource
*/
private static XContentBuilder buildObjectResource(final XContentBuilder builder,
final String bucket,
final String name,
final byte[] bytes) throws IOException {
return builder.startObject()
.field("kind", "storage#object")
.field("id", String.join("/", bucket, name))
.field("name", name)
.field("bucket", bucket)
.field("size", String.valueOf(bytes.length))
.endObject();
}
}

View File

@ -1,663 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.gcs;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* {@link GoogleCloudStorageTestServer} emulates a Google Cloud Storage service through
* a {@link #handle(String, String, String, Map, byte[])} method that provides appropriate
* responses for specific requests like the real Google Cloud platform would do.
* It is largely based on official documentation available at https://cloud.google.com/storage/docs/json_api/v1/.
*/
public class GoogleCloudStorageTestServer {
private static final byte[] EMPTY_BYTE = new byte[0];
/** List of the buckets stored on this test server **/
private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
/** Request handlers for the requests made by the Google Cloud Storage client **/
private final PathTrie<RequestHandler> handlers;
/** Server endpoint **/
private final String endpoint;
/**
* Creates a {@link GoogleCloudStorageTestServer} with a custom endpoint
*/
GoogleCloudStorageTestServer(final String endpoint) {
this.endpoint = Objects.requireNonNull(endpoint, "endpoint must not be null");
this.handlers = defaultHandlers(endpoint, buckets);
}
/** Creates a bucket in the test server **/
void createBucket(final String bucketName) {
buckets.put(bucketName, new Bucket(bucketName));
}
public String getEndpoint() {
return endpoint;
}
/**
* Returns a Google Cloud Storage response for the given request
*
* @param method the HTTP method of the request
* @param path the path of the URL of the request
* @param query the queryString of the URL of request
* @param headers the HTTP headers of the request
* @param body the HTTP request body
* @return a {@link Response}
* @throws IOException if something goes wrong
*/
public Response handle(final String method,
final String path,
final String query,
final Map<String, List<String>> headers,
byte[] body) throws IOException {
final Map<String, String> params = new HashMap<>();
if (query != null) {
RestUtils.decodeQueryString(query, 0, params);
}
final RequestHandler handler = handlers.retrieve(method + " " + path, params);
if (handler != null) {
return handler.execute(params, headers, body);
} else {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"No handler defined for request [method: " + method + ", path: " + path + "]");
}
}
@FunctionalInterface
interface RequestHandler {
/**
* Simulates the execution of a Storage request and returns a corresponding response.
*
* @param params the request's query string parameters
* @param headers the request's headers
* @param body the request body provided as a byte array
* @return the corresponding response
*
* @throws IOException if something goes wrong
*/
Response execute(Map<String, String> params, Map<String, List<String>> headers, byte[] body) throws IOException;
}
/** Builds the default request handlers **/
private static PathTrie<RequestHandler> defaultHandlers(final String endpoint, final Map<String, Bucket> buckets) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
// GET Bucket
//
// https://cloud.google.com/storage/docs/json_api/v1/buckets/get
handlers.insert("GET " + endpoint + "/storage/v1/b/{bucket}", (params, headers, body) -> {
final String name = params.get("bucket");
if (Strings.hasText(name) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "bucket name is missing");
}
if (buckets.containsKey(name)) {
return newResponse(RestStatus.OK, emptyMap(), buildBucketResource(name));
} else {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
});
// GET Object
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
handlers.insert("GET " + endpoint + "/storage/v1/b/{bucket}/o/{object}", (params, headers, body) -> {
final String objectName = params.get("object");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(params.get("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
for (final Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
if (object.getKey().equals(objectName)) {
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, objectName, object.getValue()));
}
}
return newError(RestStatus.NOT_FOUND, "object not found");
});
// Delete Object
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/delete
handlers.insert("DELETE " + endpoint + "/storage/v1/b/{bucket}/o/{object}", (params, headers, body) -> {
final String objectName = params.get("object");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(params.get("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final byte[] bytes = bucket.objects.remove(objectName);
if (bytes != null) {
return new Response(RestStatus.NO_CONTENT, emptyMap(), XContentType.JSON.mediaType(), EMPTY_BYTE);
}
return newError(RestStatus.NOT_FOUND, "object not found");
});
// Insert Object (initialization)
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
handlers.insert("POST " + endpoint + "/upload/storage/v1/b/{bucket}/o", (params, headers, body) -> {
final String uploadType = params.get("uploadType");
if ("resumable".equals(uploadType)) {
final String objectName = params.get("name");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(params.get("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.putIfAbsent(objectName, EMPTY_BYTE) == null) {
final String location = endpoint + "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return new Response(RestStatus.CREATED, singletonMap("Location", location), XContentType.JSON.mediaType(), EMPTY_BYTE);
} else {
return newError(RestStatus.CONFLICT, "object already exist");
}
} else if ("multipart".equals(uploadType)) {
/*
* A multipart/related request body looks like this (note the binary dump inside a text blob! nice!):
* --__END_OF_PART__
* Content-Length: 135
* Content-Type: application/json; charset=UTF-8
* content-transfer-encoding: binary
*
* {"bucket":"bucket_test","crc32c":"7XacHQ==","md5Hash":"fVztGkklMlUamsSmJK7W+w==",
* "name":"tests-KEwE3bU4TuyetBgQIghmUw/master.dat-temp"}
* --__END_OF_PART__
* content-transfer-encoding: binary
*
* KEwE3bU4TuyetBgQIghmUw
* --__END_OF_PART__--
*/
String boundary = "__END_OF_PART__";
// Determine the multipart boundary
final List<String> contentTypes = headers.getOrDefault("Content-Type", headers.get("Content-type"));
if (contentTypes != null) {
final String contentType = contentTypes.get(0);
if ((contentType != null) && contentType.contains("multipart/related; boundary=")) {
boundary = contentType.replace("multipart/related; boundary=", "");
}
}
InputStream inputStreamBody = new ByteArrayInputStream(body);
final List<String> contentEncodings = headers.getOrDefault("Content-Encoding", headers.get("Content-encoding"));
if (contentEncodings != null) {
if (contentEncodings.stream().anyMatch(x -> "gzip".equalsIgnoreCase(x))) {
inputStreamBody = new GZIPInputStream(inputStreamBody);
}
}
// Read line by line ?both? parts of the multipart. Decoding headers as
// IS_8859_1 is safe.
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStreamBody, StandardCharsets.ISO_8859_1))) {
String line;
// read first part delimiter
line = reader.readLine();
if ((line == null) || (line.equals("--" + boundary) == false)) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Does not start with the part delimiter.");
}
final Map<String, List<String>> firstPartHeaders = new HashMap<>();
// Reads the first part's headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
firstPartHeaders.put(header[0], singletonList(header[1]));
}
}
final List<String> firstPartContentTypes = firstPartHeaders.getOrDefault("Content-Type",
firstPartHeaders.get("Content-type"));
if ((firstPartContentTypes == null)
|| (firstPartContentTypes.stream().noneMatch(x -> x.contains("application/json")))) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Metadata part expected to have the \"application/json\" content type.");
}
// read metadata part, a single line
line = reader.readLine();
final byte[] metadata = line.getBytes(StandardCharsets.ISO_8859_1);
if ((firstPartContentTypes != null) && (firstPartContentTypes.stream().anyMatch((x -> x.contains("charset=utf-8"))))) {
// decode as utf-8
line = new String(metadata, StandardCharsets.UTF_8);
}
final Matcher objectNameMatcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
objectNameMatcher.find();
final String objectName = objectNameMatcher.group(1);
final Matcher bucketNameMatcher = Pattern.compile("\"bucket\":\"([^\"]*)\"").matcher(line);
bucketNameMatcher.find();
final String bucketName = bucketNameMatcher.group(1);
// read second part delimiter
line = reader.readLine();
if ((line == null) || (line.equals("--" + boundary) == false)) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Second part does not start with delimiter. "
+ "Is the metadata multi-line?");
}
final Map<String, List<String>> secondPartHeaders = new HashMap<>();
// Reads the second part's headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
secondPartHeaders.put(header[0], singletonList(header[1]));
}
}
final List<String> secondPartTransferEncoding = secondPartHeaders.getOrDefault("Content-Transfer-Encoding",
secondPartHeaders.get("content-transfer-encoding"));
if ((secondPartTransferEncoding == null)
|| (secondPartTransferEncoding.stream().noneMatch(x -> x.contains("binary")))) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Data part expected to have the \"binary\" content transfer encoding.");
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
int c;
while ((c = reader.read()) != -1) {
// one char to one byte, because of the ISO_8859_1 encoding
baos.write(c);
}
final byte[] temp = baos.toByteArray();
final byte[] trailingEnding = ("\r\n--" + boundary + "--\r\n").getBytes(StandardCharsets.ISO_8859_1);
// check trailing
for (int i = trailingEnding.length - 1; i >= 0; i--) {
if (trailingEnding[i] != temp[(temp.length - trailingEnding.length) + i]) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "Error parsing multipart request.");
}
}
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final byte[] objectData = Arrays.copyOf(temp, temp.length - trailingEnding.length);
if ((objectName != null) && (bucketName != null) && (objectData != null)) {
bucket.objects.put(objectName, objectData);
return new Response(RestStatus.OK, emptyMap(), XContentType.JSON.mediaType(), metadata);
} else {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "error parsing multipart request");
}
}
} else {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload type must be resumable or multipart");
}
});
// Insert Object (upload)
//
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
handlers.insert("PUT " + endpoint + "/upload/storage/v1/b/{bucket}/o", (params, headers, body) -> {
final String objectId = params.get("upload_id");
if (Strings.hasText(objectId) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload id is missing");
}
final Bucket bucket = buckets.get(params.get("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.containsKey(objectId) == false) {
return newError(RestStatus.NOT_FOUND, "object name not found");
}
bucket.objects.put(objectId, body);
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, objectId, body));
});
// List Objects
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/list
handlers.insert("GET " + endpoint + "/storage/v1/b/{bucket}/o", (params, headers, body) -> {
final Bucket bucket = buckets.get(params.get("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.field("kind", "storage#objects");
{
builder.startArray("items");
final String prefixParam = params.get("prefix");
for (final Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
if ((prefixParam != null) && (object.getKey().startsWith(prefixParam) == false)) {
continue;
}
buildObjectResource(builder, bucket.name, object.getKey(), object.getValue());
}
builder.endArray();
}
builder.endObject();
return newResponse(RestStatus.OK, emptyMap(), builder);
});
// Download Object
//
// https://cloud.google.com/storage/docs/request-body
handlers.insert("GET " + endpoint + "/download/storage/v1/b/{bucket}/o/{object}", (params, headers, body) -> {
final String object = params.get("object");
if (Strings.hasText(object) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object id is missing");
}
final Bucket bucket = buckets.get(params.get("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.containsKey(object) == false) {
return newError(RestStatus.NOT_FOUND, "object name not found");
}
return new Response(RestStatus.OK, emptyMap(), "application/octet-stream", bucket.objects.get(object));
});
// Batch
//
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
handlers.insert("POST " + endpoint + "/batch/storage/v1", (params, headers, body) -> {
final List<Response> batchedResponses = new ArrayList<>();
// A batch request body looks like this:
//
// --__END_OF_PART__
// Content-Length: 71
// Content-Type: application/http
// content-id: 1
// content-transfer-encoding: binary
//
// DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/foo%2Ftest HTTP/1.1
//
//
// --__END_OF_PART__
// Content-Length: 71
// Content-Type: application/http
// content-id: 2
// content-transfer-encoding: binary
//
// DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/bar%2Ftest HTTP/1.1
//
//
// --__END_OF_PART__--
// Default multipart boundary
String boundary = "__END_OF_PART__";
// Determine the multipart boundary
final List<String> contentTypes = headers.getOrDefault("Content-Type", headers.get("Content-type"));
if (contentTypes != null) {
final String contentType = contentTypes.get(0);
if ((contentType != null) && contentType.contains("multipart/mixed; boundary=")) {
boundary = contentType.replace("multipart/mixed; boundary=", "");
}
}
// Read line by line the batched requests
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(
new ByteArrayInputStream(body), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
// Start of a batched request
if (line.equals("--" + boundary)) {
final Map<String, List<String>> batchedHeaders = new HashMap<>();
// Reads the headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
batchedHeaders.put(header[0], singletonList(header[1]));
}
}
// Reads the method and URL
line = reader.readLine();
final String batchedUrl = line.substring(0, line.lastIndexOf(' '));
final Map<String, String> batchedParams = new HashMap<>();
final int questionMark = batchedUrl.indexOf('?');
if (questionMark != -1) {
RestUtils.decodeQueryString(batchedUrl.substring(questionMark + 1), 0, batchedParams);
}
// Reads the body
line = reader.readLine();
byte[] batchedBody = new byte[0];
if ((line != null) || (line.startsWith("--" + boundary) == false)) {
batchedBody = line.getBytes(StandardCharsets.UTF_8);
}
// Executes the batched request
final RequestHandler handler = handlers.retrieve(batchedUrl, batchedParams);
if (handler != null) {
try {
batchedResponses.add(handler.execute(batchedParams, batchedHeaders, batchedBody));
} catch (final IOException e) {
batchedResponses.add(newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
}
}
}
// Now we can build the response
final String sep = "--";
final String line = "\r\n";
final StringBuilder builder = new StringBuilder();
for (final Response response : batchedResponses) {
builder.append(sep).append(boundary).append(line);
builder.append("Content-Type: application/http").append(line);
builder.append(line);
builder.append("HTTP/1.1 ")
.append(response.status.getStatus())
.append(' ')
.append(response.status.toString())
.append(line);
builder.append("Content-Length: ").append(response.body.length).append(line);
builder.append("Content-Type: ").append(response.contentType).append(line);
response.headers.forEach((k, v) -> builder.append(k).append(": ").append(v).append(line));
builder.append(line);
builder.append(new String(response.body, StandardCharsets.UTF_8)).append(line);
builder.append(line);
}
builder.append(line);
builder.append(sep).append(boundary).append(sep);
final byte[] content = builder.toString().getBytes(StandardCharsets.UTF_8);
return new Response(RestStatus.OK, emptyMap(), "multipart/mixed; boundary=" + boundary, content);
});
// Fake refresh of an OAuth2 token
//
handlers.insert("POST " + endpoint + "/o/oauth2/token", (url, params, req) ->
newResponse(RestStatus.OK, emptyMap(), jsonBuilder()
.startObject()
.field("access_token", "unknown")
.field("token_type", "Bearer")
.field("expires_in", 3600)
.endObject())
);
return handlers;
}
/**
* Represents a Storage bucket as if it was created on Google Cloud Storage.
*/
static class Bucket {
/** Bucket name **/
final String name;
/** Blobs contained in the bucket **/
final Map<String, byte[]> objects;
Bucket(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
/**
* Represents a Storage HTTP Response.
*/
static class Response {
final RestStatus status;
final Map<String, String> headers;
final String contentType;
final byte[] body;
Response(final RestStatus status, final Map<String, String> headers, final String contentType, final byte[] body) {
this.status = Objects.requireNonNull(status);
this.headers = Objects.requireNonNull(headers);
this.contentType = Objects.requireNonNull(contentType);
this.body = Objects.requireNonNull(body);
}
}
/**
* Builds a JSON response
*/
private static Response newResponse(final RestStatus status, final Map<String, String> headers, final XContentBuilder xContentBuilder) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BytesReference.bytes(xContentBuilder).writeTo(out);
return new Response(status, headers, XContentType.JSON.mediaType(), out.toByteArray());
} catch (final IOException e) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
/**
* Storage Error JSON representation
*/
private static Response newError(final RestStatus status, final String message) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject()
.startObject("error")
.field("code", status.getStatus())
.field("message", message)
.startArray("errors")
.startObject()
.field("domain", "global")
.field("reason", status.toString())
.field("message", message)
.endObject()
.endArray()
.endObject()
.endObject();
BytesReference.bytes(builder).writeTo(out);
}
return new Response(status, emptyMap(), XContentType.JSON.mediaType(), out.toByteArray());
} catch (final IOException e) {
final byte[] bytes = (message != null ? message : "something went wrong").getBytes(StandardCharsets.UTF_8);
return new Response(RestStatus.INTERNAL_SERVER_ERROR, emptyMap(), " text/plain", bytes);
}
}
/**
* Storage Bucket JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/bucket#resource
*/
private static XContentBuilder buildBucketResource(final String name) throws IOException {
return jsonBuilder().startObject()
.field("kind", "storage#bucket")
.field("name", name)
.field("id", name)
.endObject();
}
/**
* Storage Object JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/objects#resource
*/
private static XContentBuilder buildObjectResource(final String bucket, final String name, final byte[] bytes) throws IOException {
return buildObjectResource(jsonBuilder(), bucket, name, bytes);
}
/**
* Storage Object JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/objects#resource
*/
private static XContentBuilder buildObjectResource(final XContentBuilder builder,
final String bucket,
final String name,
final byte[] bytes) throws IOException {
return builder.startObject()
.field("kind", "storage#object")
.field("id", String.join("/", bucket, name))
.field("name", name)
.field("bucket", bucket)
.field("size", String.valueOf(bytes.length))
.endObject();
}
}

View File

@ -31,12 +31,6 @@ integTestCluster {
plugin ':plugins:repository-s3'
}
forbiddenApisTest {
// we are using jdk-internal instead of jdk-non-portable to allow for com.sun.net.httpserver.* usage
bundledSignatures -= 'jdk-non-portable'
bundledSignatures += 'jdk-internal'
}
boolean useFixture = false
String s3AccessKey = System.getenv("amazon_s3_access_key")
@ -54,7 +48,7 @@ if (!s3AccessKey && !s3SecretKey && !s3Bucket && !s3BasePath) {
/** A task to start the AmazonS3Fixture which emulates a S3 service **/
task s3Fixture(type: AntFixture) {
dependsOn compileTestJava
dependsOn testClasses
env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }"
executable = new File(project.runtimeJavaHome, 'bin/java')
args 'org.elasticsearch.repositories.s3.AmazonS3Fixture', baseDir, s3Bucket
@ -64,6 +58,7 @@ Map<String, Object> expansions = [
'bucket': s3Bucket,
'base_path': s3BasePath
]
processTestResources {
inputs.properties(expansions)
MavenFilteringHack.filter(it, expansions)

View File

@ -18,132 +18,423 @@
*/
package org.elasticsearch.repositories.s3;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import com.amazonaws.util.DateUtils;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.s3.AmazonS3TestServer.Response;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.ByteArrayOutputStream;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
/**
* {@link AmazonS3Fixture} is a fixture that emulates a S3 service.
* <p>
* It starts an asynchronous socket server that binds to a random local port. The server parses
* HTTP requests and uses a {@link AmazonS3TestServer} to handle them before returning
* them to the client as HTTP responses.
* {@link AmazonS3Fixture} emulates an AWS S3 service
* .
* he implementation is based on official documentation available at https://docs.aws.amazon.com/AmazonS3/latest/API/.
*/
public class AmazonS3Fixture {
public class AmazonS3Fixture extends AbstractHttpFixture {
public static void main(String[] args) throws Exception {
/** List of the buckets stored on this test server **/
private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
/** Request handlers for the requests made by the S3 client **/
private final PathTrie<RequestHandler> handlers;
/**
* Creates a {@link AmazonS3Fixture}
*/
private AmazonS3Fixture(final String workingDir, final String bucket) {
super(workingDir);
this.buckets.put(bucket, new Bucket(bucket));
this.handlers = defaultHandlers(buckets);
}
@Override
protected Response handle(final Request request) throws IOException {
final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters());
if (handler != null) {
final String authorization = request.getHeader("Authorization");
if (authorization == null
|| (authorization.length() > 0 && authorization.contains("s3_integration_test_access_key") == false)) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Access Denied", "");
}
return handler.handle(request);
}
return null;
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("AmazonS3Fixture <working directory> <bucket>");
}
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
final HttpServer httpServer = MockHttpServer.createHttp(socketAddress, 0);
try {
final Path workingDirectory = workingDir(args[0]);
/// Writes the PID of the current Java process in a `pid` file located in the working directory
writeFile(workingDirectory, "pid", ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
final String addressAndPort = addressToString(httpServer.getAddress());
// Writes the address and port of the http server in a `ports` file located in the working directory
writeFile(workingDirectory, "ports", addressAndPort);
// Emulates S3
final String storageUrl = "http://" + addressAndPort;
final AmazonS3TestServer storageTestServer = new AmazonS3TestServer(storageUrl);
storageTestServer.createBucket(args[1]);
httpServer.createContext("/", new ResponseHandler(storageTestServer));
httpServer.start();
// Wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
httpServer.stop(0);
}
final AmazonS3Fixture fixture = new AmazonS3Fixture(args[0], args[1]);
fixture.listen();
}
@SuppressForbidden(reason = "Paths#get is fine - we don't have environment here")
private static Path workingDir(final String dir) {
return Paths.get(dir);
}
/** Builds the default request handlers **/
private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> buckets) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
private static void writeFile(final Path dir, final String fileName, final String content) throws IOException {
final Path tempPidFile = Files.createTempFile(dir, null, null);
Files.write(tempPidFile, singleton(content));
Files.move(tempPidFile, dir.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
}
// HEAD Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html
objectsPaths("HEAD /{bucket}").forEach(path ->
handlers.insert(path, (request) -> {
final String bucketName = request.getParam("bucket");
private static String addressToString(final SocketAddress address) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
if (inetSocketAddress.getAddress() instanceof Inet6Address) {
return "[" + inetSocketAddress.getHostString() + "]:" + inetSocketAddress.getPort();
} else {
return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
}
}
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
static class ResponseHandler implements HttpHandler {
final String objectName = objectName(request.getParameters());
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
if (object.getKey().equals(objectName)) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
}
return newObjectNotFoundError(request.getId(), objectName);
})
);
private final AmazonS3TestServer storageServer;
// PUT Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
objectsPaths("PUT /{bucket}").forEach(path ->
handlers.insert(path, (request) -> {
final String destBucketName = request.getParam("bucket");
private ResponseHandler(final AmazonS3TestServer storageServer) {
this.storageServer = storageServer;
}
final Bucket destBucket = buckets.get(destBucketName);
if (destBucket == null) {
return newBucketNotFoundError(request.getId(), destBucketName);
}
@Override
public void handle(HttpExchange exchange) throws IOException {
String method = exchange.getRequestMethod();
String path = storageServer.getEndpoint() + exchange.getRequestURI().getRawPath();
String query = exchange.getRequestURI().getRawQuery();
Map<String, List<String>> headers = exchange.getRequestHeaders();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(exchange.getRequestBody(), out);
final String destObjectName = objectName(request.getParameters());
Response storageResponse = null;
// This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
// to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
//
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
//
String headerDecodedContentLength = request.getHeader("X-amz-decoded-content-length");
if (headerDecodedContentLength != null) {
int contentLength = Integer.valueOf(headerDecodedContentLength);
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
if (userAgent != null && userAgent.startsWith("Apache Ant")) {
// This is a request made by the AntFixture, just reply "OK"
storageResponse = new Response(RestStatus.OK, emptyMap(), "text/plain; charset=utf-8", "OK".getBytes(UTF_8));
// Chunked requests have a payload like this:
//
// 105;chunk-signature=01d0de6be013115a7f4794db8c4b9414e6ec71262cc33ae562a71f2eaed1efe8
// ... bytes of data ....
// 0;chunk-signature=f890420b1974c5469aaf2112e9e6f2e0334929fd45909e03c0eff7a84124f6a4
//
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(request.getBody()))) {
int b;
// Moves to the end of the first signature line
while ((b = inputStream.read()) != -1) {
if (b == '\n') {
break;
}
}
final byte[] bytes = new byte[contentLength];
inputStream.read(bytes, 0, contentLength);
destBucket.objects.put(destObjectName, bytes);
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
}
return newInternalError(request.getId(), "Something is wrong with this PUT request");
})
);
// DELETE Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
objectsPaths("DELETE /{bucket}").forEach(path ->
handlers.insert(path, (request) -> {
final String bucketName = request.getParam("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
final String objectName = objectName(request.getParameters());
if (bucket.objects.remove(objectName) != null) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
return newObjectNotFoundError(request.getId(), objectName);
})
);
// GET Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
objectsPaths("GET /{bucket}").forEach(path ->
handlers.insert(path, (request) -> {
final String bucketName = request.getParam("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
final String objectName = objectName(request.getParameters());
if (bucket.objects.containsKey(objectName)) {
return new Response(RestStatus.OK.getStatus(), contentType("application/octet-stream"), bucket.objects.get(objectName));
}
return newObjectNotFoundError(request.getId(), objectName);
})
);
// HEAD Bucket
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
handlers.insert("HEAD /{bucket}", (request) -> {
String bucket = request.getParam("bucket");
if (Strings.hasText(bucket) && buckets.containsKey(bucket)) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
} else {
// Otherwise simulate a S3 response
storageResponse = storageServer.handle(method, path, query, headers, out.toByteArray());
return newBucketNotFoundError(request.getId(), bucket);
}
});
// GET Bucket (List Objects) Version 1
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
handlers.insert("GET /{bucket}/", (request) -> {
final String bucketName = request.getParam("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
Map<String, List<String>> responseHeaders = exchange.getResponseHeaders();
responseHeaders.put("Content-Type", singletonList(storageResponse.contentType));
storageResponse.headers.forEach((k, v) -> responseHeaders.put(k, singletonList(v)));
exchange.sendResponseHeaders(storageResponse.status.getStatus(), storageResponse.body.length);
if (storageResponse.body.length > 0) {
exchange.getResponseBody().write(storageResponse.body);
String prefix = request.getParam("prefix");
if (prefix == null) {
prefix = request.getHeader("Prefix");
}
exchange.close();
return newListBucketResultResponse(request.getId(), bucket, prefix);
});
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
handlers.insert("POST /", (request) -> {
final List<String> deletes = new ArrayList<>();
final List<String> errors = new ArrayList<>();
if (request.getParam("delete") != null) {
// The request body is something like:
// <Delete><Object><Key>...</Key></Object><Object><Key>...</Key></Object></Delete>
String requestBody = Streams.copyToString(new InputStreamReader(new ByteArrayInputStream(request.getBody()), UTF_8));
if (requestBody.startsWith("<Delete>")) {
final String startMarker = "<Key>";
final String endMarker = "</Key>";
int offset = 0;
while (offset != -1) {
offset = requestBody.indexOf(startMarker, offset);
if (offset > 0) {
int closingOffset = requestBody.indexOf(endMarker, offset);
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = requestBody.substring(offset, closingOffset);
boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.remove(objectName) != null) {
found = true;
}
}
if (found) {
deletes.add(objectName);
} else {
errors.add(objectName);
}
}
}
}
return newDeleteResultResponse(request.getId(), deletes, errors);
}
}
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
});
return handlers;
}
/**
* Represents a S3 bucket.
*/
static class Bucket {
/** Bucket name **/
final String name;
/** Blobs contained in the bucket **/
final Map<String, byte[]> objects;
Bucket(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
/**
* Decline a path like "http://host:port/{bucket}" into 10 derived paths like:
* - http://host:port/{bucket}/{path0}
* - http://host:port/{bucket}/{path0}/{path1}
* - http://host:port/{bucket}/{path0}/{path1}/{path2}
* - etc
*/
private static List<String> objectsPaths(final String path) {
final List<String> paths = new ArrayList<>();
String p = path;
for (int i = 0; i < 10; i++) {
p = p + "/{path" + i + "}";
paths.add(p);
}
return paths;
}
/**
* Retrieves the object name from all derives paths named {pathX} where 0 <= X < 10.
*
* This is the counterpart of {@link #objectsPaths(String)}
*/
private static String objectName(final Map<String, String> params) {
final StringBuilder name = new StringBuilder();
for (int i = 0; i < 10; i++) {
String value = params.getOrDefault("path" + i, null);
if (value != null) {
if (name.length() > 0) {
name.append('/');
}
name.append(value);
}
}
return name.toString();
}
/**
* S3 ListBucketResult Response
*/
private static Response newListBucketResultResponse(final long requestId, final Bucket bucket, final String prefix) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
response.append("<Prefix>");
if (prefix != null) {
response.append(prefix);
}
response.append("</Prefix>");
response.append("<Marker/>");
response.append("<MaxKeys>1000</MaxKeys>");
response.append("<IsTruncated>false</IsTruncated>");
int count = 0;
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
String objectName = object.getKey();
if (prefix == null || objectName.startsWith(prefix)) {
response.append("<Contents>");
response.append("<Key>").append(objectName).append("</Key>");
response.append("<LastModified>").append(DateUtils.formatISO8601Date(new Date())).append("</LastModified>");
response.append("<ETag>&quot;").append(count++).append("&quot;</ETag>");
response.append("<Size>").append(object.getValue().length).append("</Size>");
response.append("</Contents>");
}
}
response.append("</ListBucketResult>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-amz-request-id", id);
return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8));
}
/**
* S3 DeleteResult Response
*/
private static Response newDeleteResultResponse(final long requestId,
final List<String> deletedObjects,
final List<String> ignoredObjects) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<DeleteResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
for (String deletedObject : deletedObjects) {
response.append("<Deleted>");
response.append("<Key>").append(deletedObject).append("</Key>");
response.append("</Deleted>");
}
for (String ignoredObject : ignoredObjects) {
response.append("<Error>");
response.append("<Key>").append(ignoredObject).append("</Key>");
response.append("<Code>NoSuchKey</Code>");
response.append("</Error>");
}
response.append("</DeleteResult>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-amz-request-id", id);
return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8));
}
private static Response newBucketNotFoundError(final long requestId, final String bucket) {
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchBucket", "The specified bucket does not exist", bucket);
}
private static Response newObjectNotFoundError(final long requestId, final String object) {
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchKey", "The specified key does not exist", object);
}
private static Response newInternalError(final long requestId, final String resource) {
return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "We encountered an internal error", resource);
}
/**
* S3 Error
*
* https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
*/
private static Response newError(final long requestId,
final RestStatus status,
final String code,
final String message,
final String resource) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<Error>");
response.append("<Code>").append(code).append("</Code>");
response.append("<Message>").append(message).append("</Message>");
response.append("<Resource>").append(resource).append("</Resource>");
response.append("<RequestId>").append(id).append("</RequestId>");
response.append("</Error>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-amz-request-id", id);
return new Response(status.getStatus(), headers, response.toString().getBytes(UTF_8));
}
}

View File

@ -1,500 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import com.amazonaws.util.DateUtils;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
/**
* {@link AmazonS3TestServer} emulates a S3 service through a {@link #handle(String, String, String, Map, byte[])}
* method that provides appropriate responses for specific requests like the real S3 platform would do.
* It is largely based on official documentation available at https://docs.aws.amazon.com/AmazonS3/latest/API/.
*/
public class AmazonS3TestServer {
private static byte[] EMPTY_BYTE = new byte[0];
/** List of the buckets stored on this test server **/
private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
/** Request handlers for the requests made by the S3 client **/
private final PathTrie<RequestHandler> handlers;
/** Server endpoint **/
private final String endpoint;
/** Increments for the requests ids **/
private final AtomicLong requests = new AtomicLong(0);
/**
* Creates a {@link AmazonS3TestServer} with a custom endpoint
*/
AmazonS3TestServer(final String endpoint) {
this.endpoint = Objects.requireNonNull(endpoint, "endpoint must not be null");
this.handlers = defaultHandlers(endpoint, buckets);
}
/** Creates a bucket in the test server **/
void createBucket(final String bucketName) {
buckets.put(bucketName, new Bucket(bucketName));
}
public String getEndpoint() {
return endpoint;
}
/**
* Returns a response for the given request
*
* @param method the HTTP method of the request
* @param path the path of the URL of the request
* @param query the queryString of the URL of request
* @param headers the HTTP headers of the request
* @param body the HTTP request body
* @return a {@link Response}
* @throws IOException if something goes wrong
*/
public Response handle(final String method,
final String path,
final String query,
final Map<String, List<String>> headers,
byte[] body) throws IOException {
final long requestId = requests.incrementAndGet();
final Map<String, String> params = new HashMap<>();
if (query != null) {
RestUtils.decodeQueryString(query, 0, params);
}
final List<String> authorizations = headers.get("Authorization");
if (authorizations == null
|| (authorizations.isEmpty() == false & authorizations.get(0).contains("s3_integration_test_access_key") == false)) {
return newError(requestId, RestStatus.FORBIDDEN, "AccessDenied", "Access Denied", "");
}
final RequestHandler handler = handlers.retrieve(method + " " + path, params);
if (handler != null) {
return handler.execute(params, headers, body, requestId);
} else {
return newInternalError(requestId, "No handler defined for request [method: " + method + ", path: " + path + "]");
}
}
@FunctionalInterface
interface RequestHandler {
/**
* Simulates the execution of a S3 request and returns a corresponding response.
*
* @param params the request's query string parameters
* @param headers the request's headers
* @param body the request body provided as a byte array
* @param requestId a unique id for the incoming request
* @return the corresponding response
*
* @throws IOException if something goes wrong
*/
Response execute(Map<String, String> params, Map<String, List<String>> headers, byte[] body, long requestId) throws IOException;
}
/** Builds the default request handlers **/
private static PathTrie<RequestHandler> defaultHandlers(final String endpoint, final Map<String, Bucket> buckets) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
// HEAD Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html
objectsPaths("HEAD " + endpoint + "/{bucket}").forEach(path ->
handlers.insert(path, (params, headers, body, id) -> {
final String bucketName = params.get("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(id, bucketName);
}
final String objectName = objectName(params);
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
if (object.getKey().equals(objectName)) {
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
}
}
return newObjectNotFoundError(id, objectName);
})
);
// PUT Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
objectsPaths("PUT " + endpoint + "/{bucket}").forEach(path ->
handlers.insert(path, (params, headers, body, id) -> {
final String destBucketName = params.get("bucket");
final Bucket destBucket = buckets.get(destBucketName);
if (destBucket == null) {
return newBucketNotFoundError(id, destBucketName);
}
final String destObjectName = objectName(params);
// This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
// to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
//
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
//
List<String> headerDecodedContentLength = headers.getOrDefault("X-amz-decoded-content-length", emptyList());
if (headerDecodedContentLength.size() == 1) {
int contentLength = Integer.valueOf(headerDecodedContentLength.get(0));
// Chunked requests have a payload like this:
//
// 105;chunk-signature=01d0de6be013115a7f4794db8c4b9414e6ec71262cc33ae562a71f2eaed1efe8
// ... bytes of data ....
// 0;chunk-signature=f890420b1974c5469aaf2112e9e6f2e0334929fd45909e03c0eff7a84124f6a4
//
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(body))) {
int b;
// Moves to the end of the first signature line
while ((b = inputStream.read()) != -1) {
if (b == '\n') {
break;
}
}
final byte[] bytes = new byte[contentLength];
inputStream.read(bytes, 0, contentLength);
destBucket.objects.put(destObjectName, bytes);
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
}
}
return newInternalError(id, "Something is wrong with this PUT request");
})
);
// DELETE Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
objectsPaths("DELETE " + endpoint + "/{bucket}").forEach(path ->
handlers.insert(path, (params, headers, body, id) -> {
final String bucketName = params.get("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(id, bucketName);
}
final String objectName = objectName(params);
if (bucket.objects.remove(objectName) != null) {
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
}
return newObjectNotFoundError(id, objectName);
})
);
// GET Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
objectsPaths("GET " + endpoint + "/{bucket}").forEach(path ->
handlers.insert(path, (params, headers, body, id) -> {
final String bucketName = params.get("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(id, bucketName);
}
final String objectName = objectName(params);
if (bucket.objects.containsKey(objectName)) {
return new Response(RestStatus.OK, emptyMap(), "application/octet-stream", bucket.objects.get(objectName));
}
return newObjectNotFoundError(id, objectName);
})
);
// HEAD Bucket
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
handlers.insert("HEAD " + endpoint + "/{bucket}", (params, headers, body, id) -> {
String bucket = params.get("bucket");
if (Strings.hasText(bucket) && buckets.containsKey(bucket)) {
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
} else {
return newBucketNotFoundError(id, bucket);
}
});
// GET Bucket (List Objects) Version 1
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
handlers.insert("GET " + endpoint + "/{bucket}/", (params, headers, body, id) -> {
final String bucketName = params.get("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(id, bucketName);
}
String prefix = params.get("prefix");
if (prefix == null) {
List<String> prefixes = headers.get("Prefix");
if (prefixes != null && prefixes.size() == 1) {
prefix = prefixes.get(0);
}
}
return newListBucketResultResponse(id, bucket, prefix);
});
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
handlers.insert("POST " + endpoint + "/", (params, headers, body, id) -> {
final List<String> deletes = new ArrayList<>();
final List<String> errors = new ArrayList<>();
if (params.containsKey("delete")) {
// The request body is something like:
// <Delete><Object><Key>...</Key></Object><Object><Key>...</Key></Object></Delete>
String request = Streams.copyToString(new InputStreamReader(new ByteArrayInputStream(body), StandardCharsets.UTF_8));
if (request.startsWith("<Delete>")) {
final String startMarker = "<Key>";
final String endMarker = "</Key>";
int offset = 0;
while (offset != -1) {
offset = request.indexOf(startMarker, offset);
if (offset > 0) {
int closingOffset = request.indexOf(endMarker, offset);
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = request.substring(offset, closingOffset);
boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.remove(objectName) != null) {
found = true;
}
}
if (found) {
deletes.add(objectName);
} else {
errors.add(objectName);
}
}
}
}
return newDeleteResultResponse(id, deletes, errors);
}
}
return newInternalError(id, "Something is wrong with this POST multiple deletes request");
});
return handlers;
}
/**
* Represents a S3 bucket.
*/
static class Bucket {
/** Bucket name **/
final String name;
/** Blobs contained in the bucket **/
final Map<String, byte[]> objects;
Bucket(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
/**
* Represents a HTTP Response.
*/
static class Response {
final RestStatus status;
final Map<String, String> headers;
final String contentType;
final byte[] body;
Response(final RestStatus status, final Map<String, String> headers, final String contentType, final byte[] body) {
this.status = Objects.requireNonNull(status);
this.headers = Objects.requireNonNull(headers);
this.contentType = Objects.requireNonNull(contentType);
this.body = Objects.requireNonNull(body);
}
}
/**
* Decline a path like "http://host:port/{bucket}" into 10 derived paths like:
* - http://host:port/{bucket}/{path0}
* - http://host:port/{bucket}/{path0}/{path1}
* - http://host:port/{bucket}/{path0}/{path1}/{path2}
* - etc
*/
private static List<String> objectsPaths(final String path) {
final List<String> paths = new ArrayList<>();
String p = path;
for (int i = 0; i < 10; i++) {
p = p + "/{path" + i + "}";
paths.add(p);
}
return paths;
}
/**
* Retrieves the object name from all derives paths named {pathX} where 0 <= X < 10.
*
* This is the counterpart of {@link #objectsPaths(String)}
*/
private static String objectName(final Map<String, String> params) {
final StringBuilder name = new StringBuilder();
for (int i = 0; i < 10; i++) {
String value = params.getOrDefault("path" + i, null);
if (value != null) {
if (name.length() > 0) {
name.append('/');
}
name.append(value);
}
}
return name.toString();
}
/**
* S3 ListBucketResult Response
*/
private static Response newListBucketResultResponse(final long requestId, final Bucket bucket, final String prefix) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
response.append("<Prefix>");
if (prefix != null) {
response.append(prefix);
}
response.append("</Prefix>");
response.append("<Marker/>");
response.append("<MaxKeys>1000</MaxKeys>");
response.append("<IsTruncated>false</IsTruncated>");
int count = 0;
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
String objectName = object.getKey();
if (prefix == null || objectName.startsWith(prefix)) {
response.append("<Contents>");
response.append("<Key>").append(objectName).append("</Key>");
response.append("<LastModified>").append(DateUtils.formatISO8601Date(new Date())).append("</LastModified>");
response.append("<ETag>&quot;").append(count++).append("&quot;</ETag>");
response.append("<Size>").append(object.getValue().length).append("</Size>");
response.append("</Contents>");
}
}
response.append("</ListBucketResult>");
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
}
/**
* S3 DeleteResult Response
*/
private static Response newDeleteResultResponse(final long requestId,
final List<String> deletedObjects,
final List<String> ignoredObjects) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<DeleteResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
for (String deletedObject : deletedObjects) {
response.append("<Deleted>");
response.append("<Key>").append(deletedObject).append("</Key>");
response.append("</Deleted>");
}
for (String ignoredObject : ignoredObjects) {
response.append("<Error>");
response.append("<Key>").append(ignoredObject).append("</Key>");
response.append("<Code>NoSuchKey</Code>");
response.append("</Error>");
}
response.append("</DeleteResult>");
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
}
private static Response newBucketNotFoundError(final long requestId, final String bucket) {
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchBucket", "The specified bucket does not exist", bucket);
}
private static Response newObjectNotFoundError(final long requestId, final String object) {
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchKey", "The specified key does not exist", object);
}
private static Response newInternalError(final long requestId, final String resource) {
return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "We encountered an internal error", resource);
}
/**
* S3 Error
*
* https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
*/
private static Response newError(final long requestId,
final RestStatus status,
final String code,
final String message,
final String resource) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<Error>");
response.append("<Code>").append(code).append("</Code>");
response.append("<Message>").append(message).append("</Message>");
response.append("<Resource>").append(resource).append("</Resource>");
response.append("<RequestId>").append(id).append("</RequestId>");
response.append("</Error>");
return new Response(status, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
}
}

View File

@ -34,7 +34,6 @@ List projects = [
'server',
'server:cli',
'test:framework',
'test:fixtures:example-fixture',
'test:fixtures:hdfs-fixture',
'test:fixtures:krb5kdc-fixture',
'test:fixtures:old-elasticsearch',

View File

@ -1,26 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'elasticsearch.build'
test.enabled = false
// Not published so no need to assemble
tasks.remove(assemble)
build.dependsOn.remove('assemble')
dependenciesInfo.enabled = false

View File

@ -1,78 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package example;
import com.sun.net.httpserver.HttpServer;
import java.lang.management.ManagementFactory;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Collections;
/** Crappy example test fixture that responds with TEST and closes the connection */
public class ExampleTestFixture {
public static void main(String args[]) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("ExampleTestFixture <logDirectory>");
}
Path dir = Paths.get(args[0]);
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
final HttpServer httpServer = HttpServer.create(socketAddress, 0);
// write pid file
Path tmp = Files.createTempFile(dir, null, null);
String pid = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
Files.write(tmp, Collections.singleton(pid));
Files.move(tmp, dir.resolve("pid"), StandardCopyOption.ATOMIC_MOVE);
// write port file
tmp = Files.createTempFile(dir, null, null);
InetSocketAddress bound = httpServer.getAddress();
if (bound.getAddress() instanceof Inet6Address) {
Files.write(tmp, Collections.singleton("[" + bound.getHostString() + "]:" + bound.getPort()));
} else {
Files.write(tmp, Collections.singleton(bound.getHostString() + ":" + bound.getPort()));
}
Files.move(tmp, dir.resolve("ports"), StandardCopyOption.ATOMIC_MOVE);
final byte[] response = "TEST\n".getBytes(StandardCharsets.UTF_8);
// go time
httpServer.createContext("/", exchange -> {
try {
exchange.sendResponseHeaders(200, response.length);
exchange.getResponseBody().write(response);
} finally {
exchange.close();
}
});
httpServer.start();
// wait forever, until you kill me
Thread.sleep(Long.MAX_VALUE);
}
}

View File

@ -0,0 +1,312 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.fixture;
import com.sun.net.httpserver.HttpServer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
/**
* Base class for test fixtures that requires a {@link HttpServer} to work.
*/
public abstract class AbstractHttpFixture {
protected static final Map<String, String> TEXT_PLAIN_CONTENT_TYPE = contentType("text/plain; charset=utf-8");
protected static final Map<String, String> JSON_CONTENT_TYPE = contentType("application/json; charset=utf-8");
protected static final byte[] EMPTY_BYTE = new byte[0];
/** Increments for the requests ids **/
private final AtomicLong requests = new AtomicLong(0);
/** Current working directory of the fixture **/
private final Path workingDirectory;
protected AbstractHttpFixture(final String workingDir) {
this.workingDirectory = Paths.get(Objects.requireNonNull(workingDir));
}
/**
* Opens a {@link HttpServer} and start listening on a random port.
*/
public final void listen() throws IOException, InterruptedException {
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
final HttpServer httpServer = HttpServer.create(socketAddress, 0);
try {
/// Writes the PID of the current Java process in a `pid` file located in the working directory
writeFile(workingDirectory, "pid", ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
final String addressAndPort = addressToString(httpServer.getAddress());
// Writes the address and port of the http server in a `ports` file located in the working directory
writeFile(workingDirectory, "ports", addressAndPort);
httpServer.createContext("/", exchange -> {
try {
Response response;
// Check if this is a request made by the AntFixture
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
if (userAgent != null
&& userAgent.startsWith("Apache Ant")
&& "GET".equals(exchange.getRequestMethod())
&& "/".equals(exchange.getRequestURI().getPath())) {
response = new Response(200, TEXT_PLAIN_CONTENT_TYPE, "OK".getBytes(UTF_8));
} else {
try {
final long requestId = requests.getAndIncrement();
final String method = exchange.getRequestMethod();
final Map<String, String> headers = new HashMap<>();
for (Map.Entry<String, List<String>> header : exchange.getRequestHeaders().entrySet()) {
headers.put(header.getKey(), exchange.getRequestHeaders().getFirst(header.getKey()));
}
final ByteArrayOutputStream body = new ByteArrayOutputStream();
try (InputStream requestBody = exchange.getRequestBody()) {
final byte[] buffer = new byte[1024];
int i;
while ((i = requestBody.read(buffer, 0, buffer.length)) != -1) {
body.write(buffer, 0, i);
}
body.flush();
}
final Request request = new Request(requestId, method, exchange.getRequestURI(), headers, body.toByteArray());
response = handle(request);
} catch (Exception e) {
final String error = e.getMessage() != null ? e.getMessage() : "Exception when processing the request";
response = new Response(500, singletonMap("Content-Type", "text/plain; charset=utf-8"), error.getBytes(UTF_8));
}
}
if (response == null) {
response = new Response(400, TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
response.headers.forEach((k, v) -> exchange.getResponseHeaders().put(k, singletonList(v)));
if (response.body.length > 0) {
exchange.sendResponseHeaders(response.status, response.body.length);
exchange.getResponseBody().write(response.body);
} else {
exchange.sendResponseHeaders(response.status, -1);
}
} finally {
exchange.close();
}
});
httpServer.start();
// Wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
httpServer.stop(0);
}
}
protected abstract Response handle(Request request) throws IOException;
@FunctionalInterface
public interface RequestHandler {
Response handle(Request request) throws IOException;
}
/**
* Represents a HTTP Response.
*/
protected static class Response {
private final int status;
private final Map<String, String> headers;
private final byte[] body;
public Response(final int status, final Map<String, String> headers, final byte[] body) {
this.status = status;
this.headers = Objects.requireNonNull(headers);
this.body = Objects.requireNonNull(body);
}
public int getStatus() {
return status;
}
public Map<String, String> getHeaders() {
return headers;
}
public byte[] getBody() {
return body;
}
public String getContentType() {
for (String header : headers.keySet()) {
if (header.equalsIgnoreCase("Content-Type")) {
return headers.get(header);
}
}
return null;
}
@Override
public String toString() {
return "Response{" +
"status=" + status +
", headers=" + headers +
", body=" + new String(body, UTF_8) +
'}';
}
}
/**
* Represents a HTTP Request.
*/
protected static class Request {
private final long id;
private final String method;
private final URI uri;
private final Map<String, String> parameters;
private final Map<String, String> headers;
private final byte[] body;
public Request(final long id, final String method, final URI uri, final Map<String, String> headers, final byte[] body) {
this.id = id;
this.method = Objects.requireNonNull(method);
this.uri = Objects.requireNonNull(uri);
this.headers = Objects.requireNonNull(headers);
this.body = Objects.requireNonNull(body);
final Map<String, String> params = new HashMap<>();
if (uri.getQuery() != null && uri.getQuery().length() > 0) {
for (String param : uri.getQuery().split("&")) {
int i = param.indexOf("=");
if (i > 0) {
params.put(param.substring(0, i), param.substring(i + 1));
} else {
params.put(param, "");
}
}
}
this.parameters = params;
}
public long getId() {
return id;
}
public String getMethod() {
return method;
}
public Map<String, String> getHeaders() {
return headers;
}
public String getHeader(final String headerName) {
for (String header : headers.keySet()) {
if (header.equalsIgnoreCase(headerName)) {
return headers.get(header);
}
}
return null;
}
public byte[] getBody() {
return body;
}
public String getPath() {
return uri.getRawPath();
}
public Map<String, String> getParameters() {
return parameters;
}
public String getParam(final String paramName) {
for (String param : parameters.keySet()) {
if (param.equals(paramName)) {
return parameters.get(param);
}
}
return null;
}
public String getContentType() {
return getHeader("Content-Type");
}
@Override
public String toString() {
return "Request{" +
"method='" + method + '\'' +
", uri=" + uri +
", parameters=" + parameters +
", headers=" + headers +
", body=" + body +
'}';
}
}
private static void writeFile(final Path dir, final String fileName, final String content) throws IOException {
final Path tempPidFile = Files.createTempFile(dir, null, null);
Files.write(tempPidFile, singleton(content));
Files.move(tempPidFile, dir.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
}
private static String addressToString(final SocketAddress address) {
final InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
if (inetSocketAddress.getAddress() instanceof Inet6Address) {
return "[" + inetSocketAddress.getHostString() + "]:" + inetSocketAddress.getPort();
} else {
return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
}
}
protected static Map<String, String> contentType(final String contentType) {
return singletonMap("Content-Type", contentType);
}
}