Use fixture to test repository-s3 plugin (#29296)
This commit adds a new fixture that emulates a S3 service in order to improve the existing integration tests. This is very similar to what has been made for Google Cloud Storage in #28788, and such tests would have helped a lot to catch bugs like #22534. The AmazonS3Fixture is brittle and only implements the very necessary stuff for the S3 repository to work, but at least it works and can be adapted for specific tests needs.
This commit is contained in:
parent
2b07f63bd5
commit
989e465964
|
@ -58,6 +58,12 @@ thirdPartyAudit.excludes = [
|
|||
'org.apache.log.Logger',
|
||||
]
|
||||
|
||||
forbiddenApisTest {
|
||||
// we are using jdk-internal instead of jdk-non-portable to allow for com.sun.net.httpserver.* usage
|
||||
bundledSignatures -= 'jdk-non-portable'
|
||||
bundledSignatures += 'jdk-internal'
|
||||
}
|
||||
|
||||
/** A task to start the GoogleCloudStorageFixture which emulates a Google Cloud Storage service **/
|
||||
task googleCloudStorageFixture(type: AntFixture) {
|
||||
dependsOn compileTestJava
|
||||
|
|
|
@ -52,17 +52,16 @@ import static java.util.Collections.singletonList;
|
|||
*/
|
||||
public class GoogleCloudStorageFixture {
|
||||
|
||||
@SuppressForbidden(reason = "PathUtils#get is fine - we don't have environment here")
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args == null || args.length != 2) {
|
||||
throw new IllegalArgumentException("GoogleCloudStorageFixture <working directory> <bucket>");
|
||||
}
|
||||
|
||||
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 43635);
|
||||
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
|
||||
final HttpServer httpServer = MockHttpServer.createHttp(socketAddress, 0);
|
||||
|
||||
try {
|
||||
final Path workingDirectory = Paths.get(args[0]);
|
||||
final Path workingDirectory = workingDir(args[0]);
|
||||
/// Writes the PID of the current Java process in a `pid` file located in the working directory
|
||||
writeFile(workingDirectory, "pid", ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
|
||||
|
||||
|
@ -86,6 +85,11 @@ public class GoogleCloudStorageFixture {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "Paths#get is fine - we don't have environment here")
|
||||
private static Path workingDir(final String dir) {
|
||||
return Paths.get(dir);
|
||||
}
|
||||
|
||||
private static void writeFile(final Path dir, final String fileName, final String content) throws IOException {
|
||||
final Path tempPidFile = Files.createTempFile(dir, null, null);
|
||||
Files.write(tempPidFile, singleton(content));
|
||||
|
@ -101,7 +105,6 @@ public class GoogleCloudStorageFixture {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "Use a http server")
|
||||
static class ResponseHandler implements HttpHandler {
|
||||
|
||||
private final GoogleCloudStorageTestServer storageServer;
|
||||
|
|
|
@ -13,9 +13,6 @@
|
|||
- match: { nodes.$master.plugins.0.name: repository-gcs }
|
||||
---
|
||||
"Snapshot/Restore with repository-gcs":
|
||||
- skip:
|
||||
version: " - 6.3.0"
|
||||
reason: repository-gcs was not testable through YAML tests until 6.3.0
|
||||
|
||||
# Register repository
|
||||
- do:
|
||||
|
@ -28,7 +25,15 @@
|
|||
client: "integration_test"
|
||||
|
||||
- match: { acknowledged: true }
|
||||
|
||||
|
||||
# Get repository
|
||||
- do:
|
||||
snapshot.get_repository:
|
||||
repository: repository
|
||||
|
||||
- match: {repository.settings.bucket : "bucket_test"}
|
||||
- match: {repository.settings.client : "integration_test"}
|
||||
|
||||
# Index documents
|
||||
- do:
|
||||
bulk:
|
||||
|
@ -180,7 +185,3 @@
|
|||
- do:
|
||||
snapshot.delete_repository:
|
||||
repository: repository
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
import org.elasticsearch.gradle.test.AntFixture
|
||||
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
|
@ -64,9 +66,28 @@ test {
|
|||
exclude '**/*CredentialsTests.class'
|
||||
}
|
||||
|
||||
forbiddenApisTest {
|
||||
// we are using jdk-internal instead of jdk-non-portable to allow for com.sun.net.httpserver.* usage
|
||||
bundledSignatures -= 'jdk-non-portable'
|
||||
bundledSignatures += 'jdk-internal'
|
||||
}
|
||||
|
||||
/** A task to start the AmazonS3Fixture which emulates a S3 service **/
|
||||
task s3Fixture(type: AntFixture) {
|
||||
dependsOn compileTestJava
|
||||
env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }"
|
||||
executable = new File(project.runtimeJavaHome, 'bin/java')
|
||||
args 'org.elasticsearch.repositories.s3.AmazonS3Fixture', baseDir, 'bucket_test'
|
||||
}
|
||||
|
||||
integTestCluster {
|
||||
keystoreSetting 's3.client.default.access_key', 'myaccesskey'
|
||||
keystoreSetting 's3.client.default.secret_key', 'mysecretkey'
|
||||
dependsOn s3Fixture
|
||||
|
||||
keystoreSetting 's3.client.integration_test.access_key', "s3_integration_test_access_key"
|
||||
keystoreSetting 's3.client.integration_test.secret_key', "s3_integration_test_secret_key"
|
||||
|
||||
/* Use a closure on the string to delay evaluation until tests are executed */
|
||||
setting 's3.client.integration_test.endpoint', "http://${ -> s3Fixture.addressAndPort }"
|
||||
}
|
||||
|
||||
thirdPartyAudit.excludes = [
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.elasticsearch.repositories.s3.AmazonS3TestServer.Response;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Collections.singleton;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
/**
|
||||
* {@link AmazonS3Fixture} is a fixture that emulates a S3 service.
|
||||
* <p>
|
||||
* It starts an asynchronous socket server that binds to a random local port. The server parses
|
||||
* HTTP requests and uses a {@link AmazonS3TestServer} to handle them before returning
|
||||
* them to the client as HTTP responses.
|
||||
*/
|
||||
public class AmazonS3Fixture {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args == null || args.length != 2) {
|
||||
throw new IllegalArgumentException("AmazonS3Fixture <working directory> <bucket>");
|
||||
}
|
||||
|
||||
final InetSocketAddress socketAddress = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
|
||||
final HttpServer httpServer = MockHttpServer.createHttp(socketAddress, 0);
|
||||
|
||||
try {
|
||||
final Path workingDirectory = workingDir(args[0]);
|
||||
/// Writes the PID of the current Java process in a `pid` file located in the working directory
|
||||
writeFile(workingDirectory, "pid", ManagementFactory.getRuntimeMXBean().getName().split("@")[0]);
|
||||
|
||||
final String addressAndPort = addressToString(httpServer.getAddress());
|
||||
// Writes the address and port of the http server in a `ports` file located in the working directory
|
||||
writeFile(workingDirectory, "ports", addressAndPort);
|
||||
|
||||
// Emulates S3
|
||||
final String storageUrl = "http://" + addressAndPort;
|
||||
final AmazonS3TestServer storageTestServer = new AmazonS3TestServer(storageUrl);
|
||||
storageTestServer.createBucket(args[1]);
|
||||
|
||||
httpServer.createContext("/", new ResponseHandler(storageTestServer));
|
||||
httpServer.start();
|
||||
|
||||
// Wait to be killed
|
||||
Thread.sleep(Long.MAX_VALUE);
|
||||
|
||||
} finally {
|
||||
httpServer.stop(0);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "Paths#get is fine - we don't have environment here")
|
||||
private static Path workingDir(final String dir) {
|
||||
return Paths.get(dir);
|
||||
}
|
||||
|
||||
private static void writeFile(final Path dir, final String fileName, final String content) throws IOException {
|
||||
final Path tempPidFile = Files.createTempFile(dir, null, null);
|
||||
Files.write(tempPidFile, singleton(content));
|
||||
Files.move(tempPidFile, dir.resolve(fileName), StandardCopyOption.ATOMIC_MOVE);
|
||||
}
|
||||
|
||||
private static String addressToString(final SocketAddress address) {
|
||||
final InetSocketAddress inetSocketAddress = (InetSocketAddress) address;
|
||||
if (inetSocketAddress.getAddress() instanceof Inet6Address) {
|
||||
return "[" + inetSocketAddress.getHostString() + "]:" + inetSocketAddress.getPort();
|
||||
} else {
|
||||
return inetSocketAddress.getHostString() + ":" + inetSocketAddress.getPort();
|
||||
}
|
||||
}
|
||||
|
||||
static class ResponseHandler implements HttpHandler {
|
||||
|
||||
private final AmazonS3TestServer storageServer;
|
||||
|
||||
private ResponseHandler(final AmazonS3TestServer storageServer) {
|
||||
this.storageServer = storageServer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(HttpExchange exchange) throws IOException {
|
||||
String method = exchange.getRequestMethod();
|
||||
String path = storageServer.getEndpoint() + exchange.getRequestURI().getRawPath();
|
||||
String query = exchange.getRequestURI().getRawQuery();
|
||||
Map<String, List<String>> headers = exchange.getRequestHeaders();
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
Streams.copy(exchange.getRequestBody(), out);
|
||||
|
||||
final Response storageResponse = storageServer.handle(method, path, query, headers, out.toByteArray());
|
||||
|
||||
Map<String, List<String>> responseHeaders = exchange.getResponseHeaders();
|
||||
responseHeaders.put("Content-Type", singletonList(storageResponse.contentType));
|
||||
storageResponse.headers.forEach((k, v) -> responseHeaders.put(k, singletonList(v)));
|
||||
exchange.sendResponseHeaders(storageResponse.status.getStatus(), storageResponse.body.length);
|
||||
if (storageResponse.body.length > 0) {
|
||||
exchange.getResponseBody().write(storageResponse.body);
|
||||
}
|
||||
exchange.close();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,542 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.amazonaws.util.DateUtils;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.path.PathTrie;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.singletonMap;
|
||||
|
||||
/**
|
||||
* {@link AmazonS3TestServer} emulates a S3 service through a {@link #handle(String, String, String, Map, byte[])}
|
||||
* method that provides appropriate responses for specific requests like the real S3 platform would do.
|
||||
* It is largely based on official documentation available at https://docs.aws.amazon.com/AmazonS3/latest/API/.
|
||||
*/
|
||||
public class AmazonS3TestServer {
|
||||
|
||||
private static byte[] EMPTY_BYTE = new byte[0];
|
||||
/** List of the buckets stored on this test server **/
|
||||
private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
/** Request handlers for the requests made by the S3 client **/
|
||||
private final PathTrie<RequestHandler> handlers;
|
||||
|
||||
/** Server endpoint **/
|
||||
private final String endpoint;
|
||||
|
||||
/** Increments for the requests ids **/
|
||||
private final AtomicLong requests = new AtomicLong(0);
|
||||
|
||||
/**
|
||||
* Creates a {@link AmazonS3TestServer} with a custom endpoint
|
||||
*/
|
||||
AmazonS3TestServer(final String endpoint) {
|
||||
this.endpoint = Objects.requireNonNull(endpoint, "endpoint must not be null");
|
||||
this.handlers = defaultHandlers(endpoint, buckets);
|
||||
}
|
||||
|
||||
/** Creates a bucket in the test server **/
|
||||
void createBucket(final String bucketName) {
|
||||
buckets.put(bucketName, new Bucket(bucketName));
|
||||
}
|
||||
|
||||
public String getEndpoint() {
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a response for the given request
|
||||
*
|
||||
* @param method the HTTP method of the request
|
||||
* @param path the path of the URL of the request
|
||||
* @param query the queryString of the URL of request
|
||||
* @param headers the HTTP headers of the request
|
||||
* @param body the HTTP request body
|
||||
* @return a {@link Response}
|
||||
* @throws IOException if something goes wrong
|
||||
*/
|
||||
public Response handle(final String method,
|
||||
final String path,
|
||||
final String query,
|
||||
final Map<String, List<String>> headers,
|
||||
byte[] body) throws IOException {
|
||||
|
||||
final long requestId = requests.incrementAndGet();
|
||||
|
||||
final Map<String, String> params = new HashMap<>();
|
||||
if (query != null) {
|
||||
RestUtils.decodeQueryString(query, 0, params);
|
||||
}
|
||||
|
||||
final List<String> authorizations = headers.get("Authorization");
|
||||
if (authorizations == null
|
||||
|| (authorizations.isEmpty() == false & authorizations.get(0).contains("s3_integration_test_access_key") == false)) {
|
||||
return newError(requestId, RestStatus.FORBIDDEN, "AccessDenied", "Access Denied", "");
|
||||
}
|
||||
|
||||
final RequestHandler handler = handlers.retrieve(method + " " + path, params);
|
||||
if (handler != null) {
|
||||
return handler.execute(params, headers, body, requestId);
|
||||
} else {
|
||||
return newInternalError(requestId, "No handler defined for request [method: " + method + ", path: " + path + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
interface RequestHandler {
|
||||
|
||||
/**
|
||||
* Simulates the execution of a S3 request and returns a corresponding response.
|
||||
*
|
||||
* @param params the request's query string parameters
|
||||
* @param headers the request's headers
|
||||
* @param body the request body provided as a byte array
|
||||
* @param requestId a unique id for the incoming request
|
||||
* @return the corresponding response
|
||||
*
|
||||
* @throws IOException if something goes wrong
|
||||
*/
|
||||
Response execute(Map<String, String> params, Map<String, List<String>> headers, byte[] body, long requestId) throws IOException;
|
||||
}
|
||||
|
||||
/** Builds the default request handlers **/
|
||||
private static PathTrie<RequestHandler> defaultHandlers(final String endpoint, final Map<String, Bucket> buckets) {
|
||||
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
|
||||
|
||||
// HEAD Object
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html
|
||||
objectsPaths("HEAD " + endpoint + "/{bucket}").forEach(path ->
|
||||
handlers.insert(path, (params, headers, body, id) -> {
|
||||
final String bucketName = params.get("bucket");
|
||||
|
||||
final Bucket bucket = buckets.get(bucketName);
|
||||
if (bucket == null) {
|
||||
return newBucketNotFoundError(id, bucketName);
|
||||
}
|
||||
|
||||
final String objectName = objectName(params);
|
||||
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
|
||||
if (object.getKey().equals(objectName)) {
|
||||
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
}
|
||||
}
|
||||
return newObjectNotFoundError(id, objectName);
|
||||
})
|
||||
);
|
||||
|
||||
// PUT Object & PUT Object Copy
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectCOPY.html
|
||||
objectsPaths("PUT " + endpoint + "/{bucket}").forEach(path ->
|
||||
handlers.insert(path, (params, headers, body, id) -> {
|
||||
final String destBucketName = params.get("bucket");
|
||||
|
||||
final Bucket destBucket = buckets.get(destBucketName);
|
||||
if (destBucket == null) {
|
||||
return newBucketNotFoundError(id, destBucketName);
|
||||
}
|
||||
|
||||
final String destObjectName = objectName(params);
|
||||
|
||||
// Request is a copy request
|
||||
List<String> headerCopySource = headers.getOrDefault("x-amz-copy-source", emptyList());
|
||||
if (headerCopySource.isEmpty() == false) {
|
||||
String srcObjectName = headerCopySource.get(0);
|
||||
|
||||
Bucket srcBucket = null;
|
||||
for (Bucket bucket : buckets.values()) {
|
||||
String prefix = "/" + bucket.name + "/";
|
||||
if (srcObjectName.startsWith(prefix)) {
|
||||
srcObjectName = srcObjectName.replaceFirst(prefix, "");
|
||||
srcBucket = bucket;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (srcBucket == null || srcBucket.objects.containsKey(srcObjectName) == false) {
|
||||
return newObjectNotFoundError(id, srcObjectName);
|
||||
}
|
||||
|
||||
byte[] bytes = srcBucket.objects.get(srcObjectName);
|
||||
if (bytes != null) {
|
||||
destBucket.objects.put(destObjectName, bytes);
|
||||
return newCopyResultResponse(id);
|
||||
} else {
|
||||
return newObjectNotFoundError(id, srcObjectName);
|
||||
}
|
||||
} else {
|
||||
// This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
|
||||
// to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
|
||||
//
|
||||
List<String> headerDecodedContentLength = headers.getOrDefault("X-amz-decoded-content-length", emptyList());
|
||||
if (headerDecodedContentLength.size() == 1) {
|
||||
int contentLength = Integer.valueOf(headerDecodedContentLength.get(0));
|
||||
|
||||
// Chunked requests have a payload like this:
|
||||
//
|
||||
// 105;chunk-signature=01d0de6be013115a7f4794db8c4b9414e6ec71262cc33ae562a71f2eaed1efe8
|
||||
// ... bytes of data ....
|
||||
// 0;chunk-signature=f890420b1974c5469aaf2112e9e6f2e0334929fd45909e03c0eff7a84124f6a4
|
||||
//
|
||||
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(body))) {
|
||||
int b;
|
||||
// Moves to the end of the first signature line
|
||||
while ((b = inputStream.read()) != -1) {
|
||||
if (b == '\n') {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
final byte[] bytes = new byte[contentLength];
|
||||
inputStream.read(bytes, 0, contentLength);
|
||||
|
||||
destBucket.objects.put(destObjectName, bytes);
|
||||
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
}
|
||||
}
|
||||
}
|
||||
return newInternalError(id, "Something is wrong with this PUT request");
|
||||
})
|
||||
);
|
||||
|
||||
// DELETE Object
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
|
||||
objectsPaths("DELETE " + endpoint + "/{bucket}").forEach(path ->
|
||||
handlers.insert(path, (params, headers, body, id) -> {
|
||||
final String bucketName = params.get("bucket");
|
||||
|
||||
final Bucket bucket = buckets.get(bucketName);
|
||||
if (bucket == null) {
|
||||
return newBucketNotFoundError(id, bucketName);
|
||||
}
|
||||
|
||||
final String objectName = objectName(params);
|
||||
if (bucket.objects.remove(objectName) != null) {
|
||||
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
}
|
||||
return newObjectNotFoundError(id, objectName);
|
||||
})
|
||||
);
|
||||
|
||||
// GET Object
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
|
||||
objectsPaths("GET " + endpoint + "/{bucket}").forEach(path ->
|
||||
handlers.insert(path, (params, headers, body, id) -> {
|
||||
final String bucketName = params.get("bucket");
|
||||
|
||||
final Bucket bucket = buckets.get(bucketName);
|
||||
if (bucket == null) {
|
||||
return newBucketNotFoundError(id, bucketName);
|
||||
}
|
||||
|
||||
final String objectName = objectName(params);
|
||||
if (bucket.objects.containsKey(objectName)) {
|
||||
return new Response(RestStatus.OK, emptyMap(), "application/octet-stream", bucket.objects.get(objectName));
|
||||
|
||||
}
|
||||
return newObjectNotFoundError(id, objectName);
|
||||
})
|
||||
);
|
||||
|
||||
// HEAD Bucket
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
|
||||
handlers.insert("HEAD " + endpoint + "/{bucket}", (params, headers, body, id) -> {
|
||||
String bucket = params.get("bucket");
|
||||
if (Strings.hasText(bucket) && buckets.containsKey(bucket)) {
|
||||
return new Response(RestStatus.OK, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
} else {
|
||||
return newBucketNotFoundError(id, bucket);
|
||||
}
|
||||
});
|
||||
|
||||
// GET Bucket (List Objects) Version 1
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
|
||||
handlers.insert("GET " + endpoint + "/{bucket}/", (params, headers, body, id) -> {
|
||||
final String bucketName = params.get("bucket");
|
||||
|
||||
final Bucket bucket = buckets.get(bucketName);
|
||||
if (bucket == null) {
|
||||
return newBucketNotFoundError(id, bucketName);
|
||||
}
|
||||
|
||||
String prefix = params.get("prefix");
|
||||
if (prefix == null) {
|
||||
List<String> prefixes = headers.get("Prefix");
|
||||
if (prefixes != null && prefixes.size() == 1) {
|
||||
prefix = prefixes.get(0);
|
||||
}
|
||||
}
|
||||
return newListBucketResultResponse(id, bucket, prefix);
|
||||
});
|
||||
|
||||
// Delete Multiple Objects
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
|
||||
handlers.insert("POST " + endpoint + "/", (params, headers, body, id) -> {
|
||||
final List<String> deletes = new ArrayList<>();
|
||||
final List<String> errors = new ArrayList<>();
|
||||
|
||||
if (params.containsKey("delete")) {
|
||||
// The request body is something like:
|
||||
// <Delete><Object><Key>...</Key></Object><Object><Key>...</Key></Object></Delete>
|
||||
String request = Streams.copyToString(new InputStreamReader(new ByteArrayInputStream(body), StandardCharsets.UTF_8));
|
||||
if (request.startsWith("<Delete>")) {
|
||||
final String startMarker = "<Key>";
|
||||
final String endMarker = "</Key>";
|
||||
|
||||
int offset = 0;
|
||||
while (offset != -1) {
|
||||
offset = request.indexOf(startMarker, offset);
|
||||
if (offset > 0) {
|
||||
int closingOffset = request.indexOf(endMarker, offset);
|
||||
if (closingOffset != -1) {
|
||||
offset = offset + startMarker.length();
|
||||
final String objectName = request.substring(offset, closingOffset);
|
||||
|
||||
boolean found = false;
|
||||
for (Bucket bucket : buckets.values()) {
|
||||
if (bucket.objects.remove(objectName) != null) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (found) {
|
||||
deletes.add(objectName);
|
||||
} else {
|
||||
errors.add(objectName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return newDeleteResultResponse(id, deletes, errors);
|
||||
}
|
||||
}
|
||||
return newInternalError(id, "Something is wrong with this POST multiple deletes request");
|
||||
});
|
||||
|
||||
return handlers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a S3 bucket.
|
||||
*/
|
||||
static class Bucket {
|
||||
|
||||
/** Bucket name **/
|
||||
final String name;
|
||||
|
||||
/** Blobs contained in the bucket **/
|
||||
final Map<String, byte[]> objects;
|
||||
|
||||
Bucket(final String name) {
|
||||
this.name = Objects.requireNonNull(name);
|
||||
this.objects = ConcurrentCollections.newConcurrentMap();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a HTTP Response.
|
||||
*/
|
||||
static class Response {
|
||||
|
||||
final RestStatus status;
|
||||
final Map<String, String> headers;
|
||||
final String contentType;
|
||||
final byte[] body;
|
||||
|
||||
Response(final RestStatus status, final Map<String, String> headers, final String contentType, final byte[] body) {
|
||||
this.status = Objects.requireNonNull(status);
|
||||
this.headers = Objects.requireNonNull(headers);
|
||||
this.contentType = Objects.requireNonNull(contentType);
|
||||
this.body = Objects.requireNonNull(body);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decline a path like "http://host:port/{bucket}" into 10 derived paths like:
|
||||
* - http://host:port/{bucket}/{path0}
|
||||
* - http://host:port/{bucket}/{path0}/{path1}
|
||||
* - http://host:port/{bucket}/{path0}/{path1}/{path2}
|
||||
* - etc
|
||||
*/
|
||||
private static List<String> objectsPaths(final String path) {
|
||||
final List<String> paths = new ArrayList<>();
|
||||
String p = path;
|
||||
for (int i = 0; i < 10; i++) {
|
||||
p = p + "/{path" + i + "}";
|
||||
paths.add(p);
|
||||
}
|
||||
return paths;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the object name from all derives paths named {pathX} where 0 <= X < 10.
|
||||
*
|
||||
* This is the counterpart of {@link #objectsPaths(String)}
|
||||
*/
|
||||
private static String objectName(final Map<String, String> params) {
|
||||
final StringBuilder name = new StringBuilder();
|
||||
for (int i = 0; i < 10; i++) {
|
||||
String value = params.getOrDefault("path" + i, null);
|
||||
if (value != null) {
|
||||
if (name.length() > 0) {
|
||||
name.append('/');
|
||||
}
|
||||
name.append(value);
|
||||
}
|
||||
}
|
||||
return name.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* S3 ListBucketResult Response
|
||||
*/
|
||||
private static Response newListBucketResultResponse(final long requestId, final Bucket bucket, final String prefix) {
|
||||
final String id = Long.toString(requestId);
|
||||
final StringBuilder response = new StringBuilder();
|
||||
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
response.append("<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
|
||||
response.append("<Prefix>");
|
||||
if (prefix != null) {
|
||||
response.append(prefix);
|
||||
}
|
||||
response.append("</Prefix>");
|
||||
response.append("<Marker/>");
|
||||
response.append("<MaxKeys>1000</MaxKeys>");
|
||||
response.append("<IsTruncated>false</IsTruncated>");
|
||||
|
||||
int count = 0;
|
||||
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
|
||||
String objectName = object.getKey();
|
||||
if (prefix == null || objectName.startsWith(prefix)) {
|
||||
response.append("<Contents>");
|
||||
response.append("<Key>").append(objectName).append("</Key>");
|
||||
response.append("<LastModified>").append(DateUtils.formatISO8601Date(new Date())).append("</LastModified>");
|
||||
response.append("<ETag>"").append(count++).append(""</ETag>");
|
||||
response.append("<Size>").append(object.getValue().length).append("</Size>");
|
||||
response.append("</Contents>");
|
||||
}
|
||||
}
|
||||
response.append("</ListBucketResult>");
|
||||
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
|
||||
}
|
||||
|
||||
/**
|
||||
* S3 Copy Result Response
|
||||
*/
|
||||
private static Response newCopyResultResponse(final long requestId) {
|
||||
final String id = Long.toString(requestId);
|
||||
final StringBuilder response = new StringBuilder();
|
||||
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
response.append("<CopyObjectResult>");
|
||||
response.append("<LastModified>").append(DateUtils.formatISO8601Date(new Date())).append("</LastModified>");
|
||||
response.append("<ETag>").append(requestId).append("</ETag>");
|
||||
response.append("</CopyObjectResult>");
|
||||
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
|
||||
}
|
||||
|
||||
/**
|
||||
* S3 DeleteResult Response
|
||||
*/
|
||||
private static Response newDeleteResultResponse(final long requestId,
|
||||
final List<String> deletedObjects,
|
||||
final List<String> ignoredObjects) {
|
||||
final String id = Long.toString(requestId);
|
||||
|
||||
final StringBuilder response = new StringBuilder();
|
||||
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
response.append("<DeleteResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
|
||||
for (String deletedObject : deletedObjects) {
|
||||
response.append("<Deleted>");
|
||||
response.append("<Key>").append(deletedObject).append("</Key>");
|
||||
response.append("</Deleted>");
|
||||
}
|
||||
for (String ignoredObject : ignoredObjects) {
|
||||
response.append("<Error>");
|
||||
response.append("<Key>").append(ignoredObject).append("</Key>");
|
||||
response.append("<Code>NoSuchKey</Code>");
|
||||
response.append("</Error>");
|
||||
}
|
||||
response.append("</DeleteResult>");
|
||||
return new Response(RestStatus.OK, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
|
||||
}
|
||||
|
||||
private static Response newBucketNotFoundError(final long requestId, final String bucket) {
|
||||
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchBucket", "The specified bucket does not exist", bucket);
|
||||
}
|
||||
|
||||
private static Response newObjectNotFoundError(final long requestId, final String object) {
|
||||
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchKey", "The specified key does not exist", object);
|
||||
}
|
||||
|
||||
private static Response newInternalError(final long requestId, final String resource) {
|
||||
return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "We encountered an internal error", resource);
|
||||
}
|
||||
|
||||
/**
|
||||
* S3 Error
|
||||
*
|
||||
* https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
|
||||
*/
|
||||
private static Response newError(final long requestId,
|
||||
final RestStatus status,
|
||||
final String code,
|
||||
final String message,
|
||||
final String resource) {
|
||||
final String id = Long.toString(requestId);
|
||||
final StringBuilder response = new StringBuilder();
|
||||
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
response.append("<Error>");
|
||||
response.append("<Code>").append(code).append("</Code>");
|
||||
response.append("<Message>").append(message).append("</Message>");
|
||||
response.append("<Resource>").append(resource).append("</Resource>");
|
||||
response.append("<RequestId>").append(id).append("</RequestId>");
|
||||
response.append("</Error>");
|
||||
return new Response(status, singletonMap("x-amz-request-id", id), "application/xml", response.toString().getBytes(UTF_8));
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
# Integration tests for Repository S3 component
|
||||
# Integration tests for repository-s3
|
||||
#
|
||||
"Repository S3 loaded":
|
||||
"Plugin repository-s3 is loaded":
|
||||
- do:
|
||||
cluster.state: {}
|
||||
|
||||
|
@ -11,3 +11,183 @@
|
|||
nodes.info: {}
|
||||
|
||||
- match: { nodes.$master.plugins.0.name: repository-s3 }
|
||||
---
|
||||
"Snapshot/Restore with repository-s3":
|
||||
|
||||
# Register repository
|
||||
- do:
|
||||
snapshot.create_repository:
|
||||
repository: repository
|
||||
body:
|
||||
type: s3
|
||||
settings:
|
||||
bucket: "bucket_test"
|
||||
client: "integration_test"
|
||||
canned_acl: "public-read"
|
||||
storage_class: "standard"
|
||||
|
||||
- match: { acknowledged: true }
|
||||
|
||||
# Get repository
|
||||
- do:
|
||||
snapshot.get_repository:
|
||||
repository: repository
|
||||
|
||||
- match: {repository.settings.bucket : "bucket_test"}
|
||||
- match: {repository.settings.client : "integration_test"}
|
||||
- match: {repository.settings.canned_acl : "public-read"}
|
||||
- match: {repository.settings.storage_class : "standard"}
|
||||
- is_false: repository.settings.access_key
|
||||
- is_false: repository.settings.secret_key
|
||||
|
||||
# Index documents
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body:
|
||||
- index:
|
||||
_index: docs
|
||||
_type: doc
|
||||
_id: 1
|
||||
- snapshot: one
|
||||
- index:
|
||||
_index: docs
|
||||
_type: doc
|
||||
_id: 2
|
||||
- snapshot: one
|
||||
- index:
|
||||
_index: docs
|
||||
_type: doc
|
||||
_id: 3
|
||||
- snapshot: one
|
||||
|
||||
- do:
|
||||
count:
|
||||
index: docs
|
||||
|
||||
- match: {count: 3}
|
||||
|
||||
# Create a first snapshot
|
||||
- do:
|
||||
snapshot.create:
|
||||
repository: repository
|
||||
snapshot: snapshot-one
|
||||
wait_for_completion: true
|
||||
|
||||
- match: { snapshot.snapshot: snapshot-one }
|
||||
- match: { snapshot.state : SUCCESS }
|
||||
- match: { snapshot.include_global_state: true }
|
||||
- match: { snapshot.shards.failed : 0 }
|
||||
|
||||
- do:
|
||||
snapshot.status:
|
||||
repository: repository
|
||||
snapshot: snapshot-one
|
||||
|
||||
- is_true: snapshots
|
||||
- match: { snapshots.0.snapshot: snapshot-one }
|
||||
- match: { snapshots.0.state : SUCCESS }
|
||||
|
||||
# Index more documents
|
||||
- do:
|
||||
bulk:
|
||||
refresh: true
|
||||
body:
|
||||
- index:
|
||||
_index: docs
|
||||
_type: doc
|
||||
_id: 4
|
||||
- snapshot: two
|
||||
- index:
|
||||
_index: docs
|
||||
_type: doc
|
||||
_id: 5
|
||||
- snapshot: two
|
||||
- index:
|
||||
_index: docs
|
||||
_type: doc
|
||||
_id: 6
|
||||
- snapshot: two
|
||||
- index:
|
||||
_index: docs
|
||||
_type: doc
|
||||
_id: 7
|
||||
- snapshot: two
|
||||
|
||||
- do:
|
||||
count:
|
||||
index: docs
|
||||
|
||||
- match: {count: 7}
|
||||
|
||||
# Create a second snapshot
|
||||
- do:
|
||||
snapshot.create:
|
||||
repository: repository
|
||||
snapshot: snapshot-two
|
||||
wait_for_completion: true
|
||||
|
||||
- match: { snapshot.snapshot: snapshot-two }
|
||||
- match: { snapshot.state : SUCCESS }
|
||||
- match: { snapshot.shards.failed : 0 }
|
||||
|
||||
- do:
|
||||
snapshot.get:
|
||||
repository: repository
|
||||
snapshot: snapshot-one,snapshot-two
|
||||
|
||||
- is_true: snapshots
|
||||
- match: { snapshots.0.state : SUCCESS }
|
||||
- match: { snapshots.1.state : SUCCESS }
|
||||
|
||||
# Delete the index
|
||||
- do:
|
||||
indices.delete:
|
||||
index: docs
|
||||
|
||||
# Restore the second snapshot
|
||||
- do:
|
||||
snapshot.restore:
|
||||
repository: repository
|
||||
snapshot: snapshot-two
|
||||
wait_for_completion: true
|
||||
|
||||
- do:
|
||||
count:
|
||||
index: docs
|
||||
|
||||
- match: {count: 7}
|
||||
|
||||
# Delete the index again
|
||||
- do:
|
||||
indices.delete:
|
||||
index: docs
|
||||
|
||||
# Restore the first snapshot
|
||||
- do:
|
||||
snapshot.restore:
|
||||
repository: repository
|
||||
snapshot: snapshot-one
|
||||
wait_for_completion: true
|
||||
|
||||
- do:
|
||||
count:
|
||||
index: docs
|
||||
|
||||
- match: {count: 3}
|
||||
|
||||
# Remove the snapshots
|
||||
- do:
|
||||
snapshot.delete:
|
||||
repository: repository
|
||||
snapshot: snapshot-two
|
||||
|
||||
- do:
|
||||
snapshot.delete:
|
||||
repository: repository
|
||||
snapshot: snapshot-one
|
||||
|
||||
# Remove our repository
|
||||
- do:
|
||||
snapshot.delete_repository:
|
||||
repository: repository
|
||||
|
|
|
@ -1,24 +0,0 @@
|
|||
# Integration tests for Repository S3 component
|
||||
#
|
||||
"S3 repository can be registered":
|
||||
- do:
|
||||
snapshot.create_repository:
|
||||
repository: test_repo_s3_1
|
||||
verify: false
|
||||
body:
|
||||
type: s3
|
||||
settings:
|
||||
bucket: "my_bucket_name"
|
||||
canned_acl: "public-read"
|
||||
storage_class: "standard"
|
||||
|
||||
# Get repository
|
||||
- do:
|
||||
snapshot.get_repository:
|
||||
repository: test_repo_s3_1
|
||||
|
||||
- is_true: test_repo_s3_1
|
||||
- is_true: test_repo_s3_1.settings.bucket
|
||||
- is_false: test_repo_s3_1.settings.access_key
|
||||
- is_false: test_repo_s3_1.settings.secret_key
|
||||
- match: {test_repo_s3_1.settings.canned_acl : "public-read"}
|
Loading…
Reference in New Issue