Add docker-compose fixtures for S3 integration tests (#49107) (#49229)

Similarly to what has been done for Azure (#48636) and GCS (#48762),
this committ removes the existing Ant fixture that emulates a S3 storage
service in favor of multiple docker-compose based fixtures.

The goals here are multiple: be able to reuse a s3-fixture outside of the
repository-s3 plugin; allow parallel execution of integration tests; removes
the existing AmazonS3Fixture that has evolved in a weird beast in
dedicated, more maintainable fixtures.

The server side logic that emulates S3 mostly comes from the latest
HttpHandler made for S3 blob store repository tests, with additional
features extracted from the (now removed) AmazonS3Fixture:
authentication checks, session token checks and improved response
errors. Chunked upload request support for S3 object has been added
too.

The server side logic of all tests now reside in a single S3HttpHandler class.

Whereas AmazonS3Fixture contained logic for basic tests, session token
tests, EC2 tests or ECS tests, the S3 fixtures are now dedicated to each
kind of test. Fixtures are inheriting from each other, making things easier
to maintain.
This commit is contained in:
Tanguy Leroux 2019-11-18 05:56:59 -05:00 committed by GitHub
parent fcde1e752f
commit ca4f55f2e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 814 additions and 848 deletions

View File

@ -1,6 +1,5 @@
import org.elasticsearch.gradle.MavenFilteringHack
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.test.AntFixture
import org.elasticsearch.gradle.test.RestIntegTestTask
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
@ -23,6 +22,7 @@ import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'elasticsearch.test.fixtures'
esplugin {
description 'The S3 repository plugin adds S3 repositories'
@ -52,7 +52,7 @@ dependencies {
// and whitelist this hack in JarHell
compile 'javax.xml.bind:jaxb-api:2.2.2'
testCompile project(':test:fixtures:minio-fixture')
testCompile project(':test:fixtures:s3-fixture')
}
dependencyLicenses {
@ -111,7 +111,7 @@ if (!s3PermanentAccessKey && !s3PermanentSecretKey && !s3PermanentBucket && !s3P
s3PermanentAccessKey = 'access_key'
s3PermanentSecretKey = 'secret_key'
s3PermanentBucket = 'bucket'
s3PermanentBasePath = ''
s3PermanentBasePath = 'base_path'
useFixture = true
@ -120,21 +120,21 @@ if (!s3PermanentAccessKey && !s3PermanentSecretKey && !s3PermanentBucket && !s3P
}
if (!s3TemporaryAccessKey && !s3TemporarySecretKey && !s3TemporaryBucket && !s3TemporaryBasePath && !s3TemporarySessionToken) {
s3TemporaryAccessKey = 's3_integration_test_temporary_access_key'
s3TemporarySecretKey = 's3_integration_test_temporary_secret_key'
s3TemporaryBucket = 'temporary-bucket-test'
s3TemporaryBasePath = 'integration_test'
s3TemporarySessionToken = 's3_integration_test_temporary_session_token'
s3TemporaryAccessKey = 'session_token_access_key'
s3TemporarySecretKey = 'session_token_secret_key'
s3TemporaryBucket = 'session_token_bucket'
s3TemporaryBasePath = 'session_token_base_path'
s3TemporarySessionToken = 'session_token'
} else if (!s3TemporaryAccessKey || !s3TemporarySecretKey || !s3TemporaryBucket || !s3TemporaryBasePath || !s3TemporarySessionToken) {
throw new IllegalArgumentException("not all options specified to run against external S3 service as temporary credentials are present")
}
if (!s3EC2Bucket && !s3EC2BasePath && !s3ECSBucket && !s3ECSBasePath) {
s3EC2Bucket = 'ec2-bucket-test'
s3EC2BasePath = 'integration_test'
s3ECSBucket = 'ecs-bucket-test'
s3ECSBasePath = 'integration_test'
s3EC2Bucket = 'ec2_bucket'
s3EC2BasePath = 'ec2_base_path'
s3ECSBucket = 'ecs_bucket'
s3ECSBasePath = 'ecs_base_path'
} else if (!s3EC2Bucket || !s3EC2BasePath || !s3ECSBucket || !s3ECSBasePath) {
throw new IllegalArgumentException("not all options specified to run EC2/ECS tests are present")
}
@ -148,8 +148,6 @@ task thirdPartyTest(type: Test) {
}
if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(':test:fixtures:minio-fixture')
def minioAddress = {
@ -207,39 +205,6 @@ if (useFixture) {
check.dependsOn(thirdPartyTest)
File parentFixtures = new File(project.buildDir, "fixtures")
File s3FixtureFile = new File(parentFixtures, 's3Fixture.properties')
task s3FixtureProperties {
outputs.file(s3FixtureFile)
def s3FixtureOptions = [
"tests.seed": BuildParams.testSeed,
"s3Fixture.permanent_bucket_name": s3PermanentBucket,
"s3Fixture.permanent_key": s3PermanentAccessKey,
"s3Fixture.temporary_bucket_name": s3TemporaryBucket,
"s3Fixture.temporary_key": s3TemporaryAccessKey,
"s3Fixture.temporary_session_token": s3TemporarySessionToken,
"s3Fixture.ec2_bucket_name": s3EC2Bucket,
"s3Fixture.ecs_bucket_name": s3ECSBucket,
"s3Fixture.disableChunkedEncoding": s3DisableChunkedEncoding
]
doLast {
file(s3FixtureFile).text = s3FixtureOptions.collect { k, v -> "$k = $v" }.join("\n")
}
}
/** A task to start the AmazonS3Fixture which emulates an S3 service **/
task s3Fixture(type: AntFixture) {
dependsOn testClasses
dependsOn s3FixtureProperties
inputs.file(s3FixtureFile)
env 'CLASSPATH', "${-> project.sourceSets.test.runtimeClasspath.asPath}"
executable = "${BuildParams.runtimeJavaHome}/bin/java"
args 'org.elasticsearch.repositories.s3.AmazonS3Fixture', baseDir, s3FixtureFile.getAbsolutePath()
}
processTestResources {
Map<String, Object> expansions = [
'permanent_bucket': s3PermanentBucket,
@ -256,8 +221,13 @@ processTestResources {
MavenFilteringHack.filter(it, expansions)
}
integTest {
dependsOn s3Fixture
testFixtures.useFixture(':test:fixtures:s3-fixture')
def fixtureAddress = { fixture ->
assert useFixture: 'closure should not be used without a fixture'
int ephemeralPort = project(':test:fixtures:s3-fixture').postProcessFixture.ext."test.fixtures.${fixture}.tcp.80"
assert ephemeralPort > 0
'http://127.0.0.1:' + ephemeralPort
}
testClusters.integTest {
@ -269,12 +239,12 @@ testClusters.integTest {
keystore 's3.client.integration_test_temporary.session_token', s3TemporarySessionToken
if (useFixture) {
setting 's3.client.integration_test_permanent.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE
setting 's3.client.integration_test_temporary.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE
setting 's3.client.integration_test_ec2.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE
setting 's3.client.integration_test_permanent.endpoint', { "${-> fixtureAddress('s3-fixture')}" }, IGNORE_VALUE
setting 's3.client.integration_test_temporary.endpoint', { "${-> fixtureAddress('s3-fixture-with-session-token')}" }, IGNORE_VALUE
setting 's3.client.integration_test_ec2.endpoint', { "${-> fixtureAddress('s3-fixture-with-ec2')}" }, IGNORE_VALUE
// to redirect InstanceProfileCredentialsProvider to custom auth point
systemProperty "com.amazonaws.sdk.ec2MetadataServiceEndpointOverride", { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE
systemProperty "com.amazonaws.sdk.ec2MetadataServiceEndpointOverride", { "${-> fixtureAddress('s3-fixture-with-ec2')}" }, IGNORE_VALUE
} else {
println "Using an external service to test the repository-s3 plugin"
}
@ -287,7 +257,7 @@ task s3ThirdPartyTests {
if (useFixture) {
task integTestECS(type: RestIntegTestTask.class) {
description = "Runs tests using the ECS repository."
dependsOn(project.s3Fixture)
dependsOn('bundlePlugin')
runner {
systemProperty 'tests.rest.blacklist', [
'repository_s3/10_basic/*',
@ -300,9 +270,9 @@ if (useFixture) {
check.dependsOn(integTestECS)
testClusters.integTestECS {
setting 's3.client.integration_test_ecs.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE
setting 's3.client.integration_test_ecs.endpoint', { "${-> fixtureAddress('s3-fixture-with-ecs')}" }, IGNORE_VALUE
plugin file(tasks.bundlePlugin.archiveFile)
environment 'AWS_CONTAINER_CREDENTIALS_FULL_URI', { "http://${s3Fixture.addressAndPort}/ecs_credentials_endpoint" }, IGNORE_VALUE
environment 'AWS_CONTAINER_CREDENTIALS_FULL_URI', { "${-> fixtureAddress('s3-fixture-with-ecs')}/ecs_credentials_endpoint" }, IGNORE_VALUE
}
gradle.taskGraph.whenReady {

View File

@ -1,615 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import com.amazonaws.util.DateUtils;
import com.amazonaws.util.IOUtils;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static com.carrotsearch.randomizedtesting.generators.RandomStrings.randomAsciiAlphanumOfLength;
import static com.carrotsearch.randomizedtesting.generators.RandomStrings.randomAsciiAlphanumOfLengthBetween;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;
/**
* {@link AmazonS3Fixture} emulates an AWS S3 service
* .
* he implementation is based on official documentation available at https://docs.aws.amazon.com/AmazonS3/latest/API/.
*/
public class AmazonS3Fixture extends AbstractHttpFixture {
private static final String AUTH = "AUTH";
private static final String NON_AUTH = "NON_AUTH";
private static final String EC2_PROFILE = "ec2Profile";
private final Properties properties;
private final Random random;
/** List of the buckets stored on this test server **/
private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
/** Request handlers for the requests made by the S3 client **/
private final PathTrie<RequestHandler> handlers;
private final boolean disableChunkedEncoding;
/**
* Creates a {@link AmazonS3Fixture}
*/
private AmazonS3Fixture(final String workingDir, Properties properties) {
super(workingDir);
this.properties = properties;
this.random = new Random(Long.parseUnsignedLong(requireNonNull(properties.getProperty("tests.seed")), 16));
new Bucket("s3Fixture.permanent", false);
new Bucket("s3Fixture.temporary", true);
final Bucket ec2Bucket = new Bucket("s3Fixture.ec2",
randomAsciiAlphanumOfLength(random, 10), randomAsciiAlphanumOfLength(random, 10));
final Bucket ecsBucket = new Bucket("s3Fixture.ecs",
randomAsciiAlphanumOfLength(random, 10), randomAsciiAlphanumOfLength(random, 10));
this.handlers = defaultHandlers(buckets, ec2Bucket, ecsBucket);
this.disableChunkedEncoding = Boolean.parseBoolean(prop(properties, "s3Fixture.disableChunkedEncoding"));
}
private static String nonAuthPath(Request request) {
return nonAuthPath(request.getMethod(), request.getPath());
}
private static String nonAuthPath(String method, String path) {
return NON_AUTH + " " + method + " " + path;
}
private static String authPath(Request request) {
return authPath(request.getMethod(), request.getPath());
}
private static String authPath(String method, String path) {
return AUTH + " " + method + " " + path;
}
@Override
protected Response handle(final Request request) throws IOException {
final String nonAuthorizedPath = nonAuthPath(request);
final RequestHandler nonAuthorizedHandler = handlers.retrieve(nonAuthorizedPath, request.getParameters());
if (nonAuthorizedHandler != null) {
return nonAuthorizedHandler.handle(request);
}
final String authorizedPath = authPath(request);
final RequestHandler handler = handlers.retrieve(authorizedPath, request.getParameters());
if (handler != null) {
final String bucketName = request.getParam("bucket");
if (bucketName == null) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Bad access key", "");
}
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
final Response authResponse = authenticateBucket(request, bucket);
if (authResponse != null) {
return authResponse;
}
return handler.handle(request);
} else {
return newInternalError(request.getId(), "No handler defined for request [" + request + "]");
}
}
private Response authenticateBucket(Request request, Bucket bucket) {
final String authorization = request.getHeader("Authorization");
if (authorization == null) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Bad access key", "");
}
if (authorization.contains(bucket.key)) {
final String sessionToken = request.getHeader("x-amz-security-token");
if (bucket.token == null) {
if (sessionToken != null) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Unexpected session token", "");
}
} else {
if (sessionToken == null) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "No session token", "");
}
if (sessionToken.equals(bucket.token) == false) {
return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Bad session token", "");
}
}
}
return null;
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("AmazonS3Fixture <working directory> <property file>");
}
final Properties properties = new Properties();
try (InputStream is = Files.newInputStream(PathUtils.get(args[1]))) {
properties.load(is);
}
final AmazonS3Fixture fixture = new AmazonS3Fixture(args[0], properties);
fixture.listen();
}
/** Builds the default request handlers **/
private PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> buckets, final Bucket ec2Bucket, final Bucket ecsBucket) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
// HEAD Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html
objectsPaths(authPath(HttpHead.METHOD_NAME, "/{bucket}")).forEach(path ->
handlers.insert(path, (request) -> {
final String bucketName = request.getParam("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
final String objectName = objectName(request.getParameters());
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
if (object.getKey().equals(objectName)) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
}
return newObjectNotFoundError(request.getId(), objectName);
})
);
// PUT Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
objectsPaths(authPath(HttpPut.METHOD_NAME, "/{bucket}")).forEach(path ->
handlers.insert(path, (request) -> {
final String destBucketName = request.getParam("bucket");
final Bucket destBucket = buckets.get(destBucketName);
if (destBucket == null) {
return newBucketNotFoundError(request.getId(), destBucketName);
}
final String destObjectName = objectName(request.getParameters());
String headerDecodedContentLength = request.getHeader("X-amz-decoded-content-length");
if (headerDecodedContentLength != null) {
if (disableChunkedEncoding) {
return newInternalError(request.getId(), "Something is wrong with this PUT request");
}
// This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip"
// to detect it but it seems that the AWS SDK does not follow the S3 guidelines here.
//
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
//
int contentLength = Integer.valueOf(headerDecodedContentLength);
// Chunked requests have a payload like this:
//
// 105;chunk-signature=01d0de6be013115a7f4794db8c4b9414e6ec71262cc33ae562a71f2eaed1efe8
// ... bytes of data ....
// 0;chunk-signature=f890420b1974c5469aaf2112e9e6f2e0334929fd45909e03c0eff7a84124f6a4
//
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(request.getBody()))) {
int b;
// Moves to the end of the first signature line
while ((b = inputStream.read()) != -1) {
if (b == '\n') {
break;
}
}
final byte[] bytes = new byte[contentLength];
inputStream.read(bytes, 0, contentLength);
destBucket.objects.put(destObjectName, bytes);
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
} else {
if (disableChunkedEncoding == false) {
return newInternalError(request.getId(), "Something is wrong with this PUT request");
}
// Read from body directly
try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(request.getBody()))) {
byte[] bytes = IOUtils.toByteArray(inputStream);
destBucket.objects.put(destObjectName, bytes);
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
}
})
);
// DELETE Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
objectsPaths(authPath(HttpDelete.METHOD_NAME, "/{bucket}")).forEach(path ->
handlers.insert(path, (request) -> {
final String bucketName = request.getParam("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
final String objectName = objectName(request.getParameters());
bucket.objects.remove(objectName);
return new Response(RestStatus.NO_CONTENT.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
})
);
// GET Object
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
objectsPaths(authPath(HttpGet.METHOD_NAME, "/{bucket}")).forEach(path ->
handlers.insert(path, (request) -> {
final String bucketName = request.getParam("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
final String objectName = objectName(request.getParameters());
if (bucket.objects.containsKey(objectName)) {
return new Response(RestStatus.OK.getStatus(), contentType("application/octet-stream"), bucket.objects.get(objectName));
}
return newObjectNotFoundError(request.getId(), objectName);
})
);
// HEAD Bucket
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html
handlers.insert(authPath(HttpHead.METHOD_NAME, "/{bucket}"), (request) -> {
String bucket = request.getParam("bucket");
if (Strings.hasText(bucket) && buckets.containsKey(bucket)) {
return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
} else {
return newBucketNotFoundError(request.getId(), bucket);
}
});
// GET Bucket (List Objects) Version 1
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html
handlers.insert(authPath(HttpGet.METHOD_NAME, "/{bucket}/"), (request) -> {
final String bucketName = request.getParam("bucket");
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newBucketNotFoundError(request.getId(), bucketName);
}
String prefix = request.getParam("prefix");
if (prefix == null) {
prefix = request.getHeader("Prefix");
}
return newListBucketResultResponse(request.getId(), bucket, prefix);
});
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
final RequestHandler bulkDeleteHandler = request -> {
final List<String> deletes = new ArrayList<>();
final List<String> errors = new ArrayList<>();
if (request.getParam("delete") != null) {
// The request body is something like:
// <Delete><Object><Key>...</Key></Object><Object><Key>...</Key></Object></Delete>
String requestBody = Streams.copyToString(new InputStreamReader(new ByteArrayInputStream(request.getBody()), UTF_8));
if (requestBody.startsWith("<Delete>")) {
final String startMarker = "<Key>";
final String endMarker = "</Key>";
int offset = 0;
while (offset != -1) {
offset = requestBody.indexOf(startMarker, offset);
if (offset > 0) {
int closingOffset = requestBody.indexOf(endMarker, offset);
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = requestBody.substring(offset, closingOffset);
boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.containsKey(objectName)) {
final Response authResponse = authenticateBucket(request, bucket);
if (authResponse != null) {
return authResponse;
}
bucket.objects.remove(objectName);
found = true;
}
}
if (found) {
deletes.add(objectName);
} else {
errors.add(objectName);
}
}
}
}
return newDeleteResultResponse(request.getId(), deletes, errors);
}
}
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
};
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);
// non-authorized requests
TriFunction<String, String, String, Response> credentialResponseFunction = (profileName, key, token) -> {
final Date expiration = new Date(new Date().getTime() + TimeUnit.DAYS.toMillis(1));
final String response = "{"
+ "\"AccessKeyId\": \"" + key + "\","
+ "\"Expiration\": \"" + DateUtils.formatISO8601Date(expiration) + "\","
+ "\"RoleArn\": \"" + randomAsciiAlphanumOfLengthBetween(random, 1, 20) + "\","
+ "\"SecretAccessKey\": \"" + randomAsciiAlphanumOfLengthBetween(random, 1, 20) + "\","
+ "\"Token\": \"" + token + "\""
+ "}";
final Map<String, String> headers = new HashMap<>(contentType("application/json"));
return new Response(RestStatus.OK.getStatus(), headers, response.getBytes(UTF_8));
};
// GET
//
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
handlers.insert(nonAuthPath(HttpGet.METHOD_NAME, "/latest/meta-data/iam/security-credentials/"), (request) -> {
final String response = EC2_PROFILE;
final Map<String, String> headers = new HashMap<>(contentType("text/plain"));
return new Response(RestStatus.OK.getStatus(), headers, response.getBytes(UTF_8));
});
// GET
//
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
handlers.insert(nonAuthPath(HttpGet.METHOD_NAME, "/latest/meta-data/iam/security-credentials/{profileName}"), (request) -> {
final String profileName = request.getParam("profileName");
if (EC2_PROFILE.equals(profileName) == false) {
return new Response(RestStatus.NOT_FOUND.getStatus(), new HashMap<>(), "unknown profile".getBytes(UTF_8));
}
return credentialResponseFunction.apply(profileName, ec2Bucket.key, ec2Bucket.token);
});
// GET
//
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
handlers.insert(nonAuthPath(HttpGet.METHOD_NAME, "/ecs_credentials_endpoint"),
(request) -> credentialResponseFunction.apply("CPV_ECS", ecsBucket.key, ecsBucket.token));
return handlers;
}
private static String prop(Properties properties, String propertyName) {
return requireNonNull(properties.getProperty(propertyName),
"property '" + propertyName + "' is missing");
}
/**
* Represents a S3 bucket.
*/
class Bucket {
/** Bucket name **/
final String name;
final String key;
final String token;
/** Blobs contained in the bucket **/
final Map<String, byte[]> objects;
private Bucket(final String prefix, final boolean tokenRequired) {
this(prefix, prop(properties, prefix + "_key"),
tokenRequired ? prop(properties, prefix + "_session_token") : null);
}
private Bucket(final String prefix, final String key, final String token) {
this.name = prop(properties, prefix + "_bucket_name");
this.key = key;
this.token = token;
this.objects = ConcurrentCollections.newConcurrentMap();
if (buckets.put(name, this) != null) {
throw new IllegalArgumentException("bucket " + name + " is already registered");
}
}
}
/**
* Decline a path like "http://host:port/{bucket}" into 10 derived paths like:
* - http://host:port/{bucket}/{path0}
* - http://host:port/{bucket}/{path0}/{path1}
* - http://host:port/{bucket}/{path0}/{path1}/{path2}
* - etc
*/
private static List<String> objectsPaths(final String path) {
final List<String> paths = new ArrayList<>();
String p = path;
for (int i = 0; i < 10; i++) {
p = p + "/{path" + i + "}";
paths.add(p);
}
return paths;
}
/**
* Retrieves the object name from all derives paths named {pathX} where 0 &lt;= X &lt; 10.
*
* This is the counterpart of {@link #objectsPaths(String)}
*/
private static String objectName(final Map<String, String> params) {
final StringBuilder name = new StringBuilder();
for (int i = 0; i < 10; i++) {
String value = params.getOrDefault("path" + i, null);
if (value != null) {
if (name.length() > 0) {
name.append('/');
}
name.append(value);
}
}
return name.toString();
}
/**
* S3 ListBucketResult Response
*/
private static Response newListBucketResultResponse(final long requestId, final Bucket bucket, final String prefix) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<ListBucketResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
response.append("<Prefix>");
if (prefix != null) {
response.append(prefix);
}
response.append("</Prefix>");
response.append("<Marker/>");
response.append("<MaxKeys>1000</MaxKeys>");
response.append("<IsTruncated>false</IsTruncated>");
int count = 0;
for (Map.Entry<String, byte[]> object : bucket.objects.entrySet()) {
String objectName = object.getKey();
if (prefix == null || objectName.startsWith(prefix)) {
response.append("<Contents>");
response.append("<Key>").append(objectName).append("</Key>");
response.append("<LastModified>").append(DateUtils.formatISO8601Date(new Date())).append("</LastModified>");
response.append("<ETag>&quot;").append(count++).append("&quot;</ETag>");
response.append("<Size>").append(object.getValue().length).append("</Size>");
response.append("</Contents>");
}
}
response.append("</ListBucketResult>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-amz-request-id", id);
return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8));
}
/**
* S3 DeleteResult Response
*/
private static Response newDeleteResultResponse(final long requestId,
final List<String> deletedObjects,
final List<String> ignoredObjects) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<DeleteResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">");
for (String deletedObject : deletedObjects) {
response.append("<Deleted>");
response.append("<Key>").append(deletedObject).append("</Key>");
response.append("</Deleted>");
}
for (String ignoredObject : ignoredObjects) {
response.append("<Error>");
response.append("<Key>").append(ignoredObject).append("</Key>");
response.append("<Code>NoSuchKey</Code>");
response.append("</Error>");
}
response.append("</DeleteResult>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-amz-request-id", id);
return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8));
}
private static Response newBucketNotFoundError(final long requestId, final String bucket) {
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchBucket", "The specified bucket does not exist", bucket);
}
private static Response newObjectNotFoundError(final long requestId, final String object) {
return newError(requestId, RestStatus.NOT_FOUND, "NoSuchKey", "The specified key does not exist", object);
}
private static Response newInternalError(final long requestId, final String resource) {
return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "We encountered an internal error", resource);
}
/**
* S3 Error
*
* https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
*/
private static Response newError(final long requestId,
final RestStatus status,
final String code,
final String message,
final String resource) {
final String id = Long.toString(requestId);
final StringBuilder response = new StringBuilder();
response.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
response.append("<Error>");
response.append("<Code>").append(code).append("</Code>");
response.append("<Message>").append(message).append("</Message>");
response.append("<Resource>").append(resource).append("</Resource>");
response.append("<RequestId>").append(id).append("</RequestId>");
response.append("</Error>");
final Map<String, String> headers = new HashMap<>(contentType("application/xml"));
headers.put("x-amz-request-id", id);
return new Response(status.getStatus(), headers, response.toString().getBytes(UTF_8));
}
}

View File

@ -19,20 +19,14 @@
package org.elasticsearch.repositories.s3;
import com.amazonaws.http.AmazonHttpClient;
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -40,27 +34,14 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.Matchers.nullValue;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@ -86,7 +67,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/bucket", new InternalHttpHandler());
return Collections.singletonMap("/bucket", new S3HttpHandler("bucket"));
}
@Override
@ -153,162 +134,6 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
}
}
/**
* Minimal HTTP handler that acts as a S3 compliant server
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
private static class InternalHttpHandler implements HttpHandler {
private final ConcurrentMap<String, BytesReference> blobs = new ConcurrentHashMap<>();
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
try {
if (Regex.simpleMatch("POST /bucket/*?uploads", request)) {
final String uploadId = UUIDs.randomBase64UUID();
byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<InitiateMultipartUploadResult>\n" +
" <Bucket>bucket</Bucket>\n" +
" <Key>" + exchange.getRequestURI().getPath() + "</Key>\n" +
" <UploadId>" + uploadId + "</UploadId>\n" +
"</InitiateMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
blobs.put(multipartKey(uploadId, 0), BytesArray.EMPTY);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("PUT /bucket/*?uploadId=*&partNumber=*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String uploadId = params.get("uploadId");
if (blobs.containsKey(multipartKey(uploadId, 0))) {
final int partNumber = Integer.parseInt(params.get("partNumber"));
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
blobs.put(multipartKey(uploadId, partNumber), Streams.readFully(md5));
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
}
} else if (Regex.simpleMatch("POST /bucket/*?uploadId=*", request)) {
drainInputStream(exchange.getRequestBody());
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String uploadId = params.get("uploadId");
final int nbParts = blobs.keySet().stream()
.filter(blobName -> blobName.startsWith(uploadId))
.map(blobName -> blobName.replaceFirst(uploadId + '\n', ""))
.mapToInt(Integer::parseInt)
.max()
.orElse(0);
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
for (int partNumber = 0; partNumber <= nbParts; partNumber++) {
BytesReference part = blobs.remove(multipartKey(uploadId, partNumber));
assertNotNull(part);
part.writeTo(blob);
}
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<CompleteMultipartUploadResult>\n" +
" <Bucket>bucket</Bucket>\n" +
" <Key>" + exchange.getRequestURI().getPath() + "</Key>\n" +
"</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
}else if (Regex.simpleMatch("PUT /bucket/*", request)) {
blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody()));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else if (Regex.simpleMatch("GET /bucket/?prefix=*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
assertThat("Test must be adapted for GET Bucket (List Objects) Version 2", params.get("list-type"), nullValue());
final StringBuilder list = new StringBuilder();
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
list.append("<ListBucketResult>");
final String prefix = params.get("prefix");
if (prefix != null) {
list.append("<Prefix>").append(prefix).append("</Prefix>");
}
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/bucket/" + prefix)) {
list.append("<Contents>");
list.append("<Key>").append(blob.getKey().replace("/bucket/", "")).append("</Key>");
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
list.append("</Contents>");
}
}
list.append("</ListBucketResult>");
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("GET /bucket/*", request)) {
final BytesReference blob = blobs.get(exchange.getRequestURI().toString());
if (blob != null) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
blob.writeTo(exchange.getResponseBody());
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
}
} else if (Regex.simpleMatch("DELETE /bucket/*", request)) {
int deletions = 0;
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesReference> blob = iterator.next();
if (blob.getKey().startsWith(exchange.getRequestURI().toString())) {
iterator.remove();
deletions++;
}
}
exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);
} else if (Regex.simpleMatch("POST /bucket/?delete", request)) {
final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
final StringBuilder deletes = new StringBuilder();
deletes.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
deletes.append("<DeleteResult>");
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesReference> blob = iterator.next();
String key = blob.getKey().replace("/bucket/", "");
if (requestBody.contains("<Key>" + key + "</Key>")) {
deletes.append("<Deleted><Key>").append(key).append("</Key></Deleted>");
iterator.remove();
}
}
deletes.append("</DeleteResult>");
byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} finally {
exchange.close();
}
}
private static String multipartKey(final String uploadId, int partNumber) {
return uploadId + "\n" + partNumber;
}
}
/**
* HTTP handler that injects random S3 service errors
*

View File

@ -58,6 +58,7 @@ List projects = [
'test:fixtures:krb5kdc-fixture',
'test:fixtures:minio-fixture',
'test:fixtures:old-elasticsearch',
'test:fixtures:s3-fixture',
'test:logger-usage'
]

23
test/fixtures/s3-fixture/Dockerfile vendored Normal file
View File

@ -0,0 +1,23 @@
FROM ubuntu:19.04
RUN apt-get update -qqy
RUN apt-get install -qqy openjdk-12-jre-headless
ARG fixtureClass
ARG port
ARG bucket
ARG basePath
ARG accessKey
ARG sessionToken
ENV S3_FIXTURE_CLASS=${fixtureClass}
ENV S3_FIXTURE_PORT=${port}
ENV S3_FIXTURE_BUCKET=${bucket}
ENV S3_FIXTURE_BASE_PATH=${basePath}
ENV S3_FIXTURE_ACCESS_KEY=${accessKey}
ENV S3_FIXTURE_SESSION_TOKEN=${sessionToken}
ENTRYPOINT exec java -classpath "/fixture/shared/*" \
$S3_FIXTURE_CLASS 0.0.0.0 "$S3_FIXTURE_PORT" "$S3_FIXTURE_BUCKET" "$S3_FIXTURE_BASE_PATH" "$S3_FIXTURE_ACCESS_KEY" "$S3_FIXTURE_SESSION_TOKEN"
EXPOSE $port

39
test/fixtures/s3-fixture/build.gradle vendored Normal file
View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'elasticsearch.build'
apply plugin: 'elasticsearch.test.fixtures'
description = 'Fixture for S3 Storage service'
test.enabled = false
dependencies {
compile project(':server')
}
preProcessFixture {
dependsOn jar
doLast {
file("${testFixturesDir}/shared").mkdirs()
project.copy {
from jar
from configurations.runtimeClasspath
into "${testFixturesDir}/shared"
}
}
}

View File

@ -0,0 +1,64 @@
version: '3'
services:
s3-fixture:
build:
context: .
args:
fixtureClass: fixture.s3.S3HttpFixture
port: 80
bucket: "bucket"
basePath: "base_path_integration_tests"
accessKey: "access_key"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
s3-fixture-with-session-token:
build:
context: .
args:
fixtureClass: fixture.s3.S3HttpFixtureWithSessionToken
port: 80
bucket: "session_token_bucket"
basePath: "session_token_base_path_integration_tests"
accessKey: "session_token_access_key"
sessionToken: "session_token"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
s3-fixture-with-ec2:
build:
context: .
args:
fixtureClass: fixture.s3.S3HttpFixtureWithEC2
port: 80
bucket: "ec2_bucket"
basePath: "ec2_base_path"
accessKey: "ec2_access_key"
sessionToken: "ec2_session_token"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
s3-fixture-with-ecs:
build:
context: .
args:
fixtureClass: fixture.s3.S3HttpFixtureWithECS
port: 80
bucket: "ecs_bucket"
basePath: "ecs_base_path"
accessKey: "ecs_access_key"
sessionToken: "ecs_session_token"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.s3;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Objects;
public class S3HttpFixture {
private final HttpServer server;
S3HttpFixture(final String[] args) throws Exception {
this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(args[0]), Integer.parseInt(args[1])), 0);
this.server.createContext("/", Objects.requireNonNull(createHandler(args)));
}
final void start() throws Exception {
try {
server.start();
// wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
server.stop(0);
}
}
protected HttpHandler createHandler(final String[] args) {
final String bucket = Objects.requireNonNull(args[2]);
final String basePath = args[3];
final String accessKey = Objects.requireNonNull(args[4]);
return new S3HttpHandler(bucket, basePath) {
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String authorization = exchange.getRequestHeaders().getFirst("Authorization");
if (authorization == null || authorization.contains(accessKey) == false) {
sendError(exchange, RestStatus.FORBIDDEN, "AccessDenied", "Bad access key");
return;
}
super.handle(exchange);
}
};
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length < 5) {
throw new IllegalArgumentException("S3HttpFixture expects 5 arguments [address, port, bucket, base path, access key]");
}
final S3HttpFixture fixture = new S3HttpFixture(args);
fixture.start();
}
}

View File

@ -0,0 +1,95 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.s3;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.rest.RestStatus;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
public class S3HttpFixtureWithEC2 extends S3HttpFixtureWithSessionToken {
private static final String EC2_PATH = "/latest/meta-data/iam/security-credentials/";
private static final String EC2_PROFILE = "ec2Profile";
S3HttpFixtureWithEC2(final String[] args) throws Exception {
super(args);
}
@Override
protected HttpHandler createHandler(final String[] args) {
final String ec2AccessKey = Objects.requireNonNull(args[4]);
final String ec2SessionToken = Objects.requireNonNull(args[5], "session token is missing");
final HttpHandler delegate = super.createHandler(args);
return exchange -> {
final String path = exchange.getRequestURI().getPath();
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html
if ("GET".equals(exchange.getRequestMethod()) && path.startsWith(EC2_PATH)) {
if (path.equals(EC2_PATH)) {
final byte[] response = EC2_PROFILE.getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "text/plain");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
} else if (path.equals(EC2_PATH + EC2_PROFILE)) {
final byte[] response = buildCredentialResponse(ec2AccessKey, ec2SessionToken).getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
}
final byte[] response = "unknown profile".getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "text/plain");
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
}
delegate.handle(exchange);
};
}
protected String buildCredentialResponse(final String ec2AccessKey, final String ec2SessionToken) {
return "{"
+ "\"AccessKeyId\": \"" + ec2AccessKey + "\","
+ "\"Expiration\": \"" + ZonedDateTime.now().plusDays(1L).format(DateTimeFormatter.ISO_DATE_TIME) + "\","
+ "\"RoleArn\": \"arn\","
+ "\"SecretAccessKey\": \"secret\","
+ "\"Token\": \"" + ec2SessionToken + "\""
+ "}";
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length < 6) {
throw new IllegalArgumentException("S3HttpFixtureWithEC2 expects 6 arguments " +
"[address, port, bucket, base path, ec2 access id, ec2 session token]");
}
final S3HttpFixtureWithEC2 fixture = new S3HttpFixtureWithEC2(args);
fixture.start();
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.s3;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.rest.RestStatus;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
public class S3HttpFixtureWithECS extends S3HttpFixtureWithEC2 {
private S3HttpFixtureWithECS(final String[] args) throws Exception {
super(args);
}
@Override
protected HttpHandler createHandler(final String[] args) {
final String ecsAccessKey = Objects.requireNonNull(args[4]);
final String ecsSessionToken = Objects.requireNonNull(args[5], "session token is missing");
final HttpHandler delegate = super.createHandler(args);
return exchange -> {
// https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html
if ("GET".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getPath().equals("/ecs_credentials_endpoint")) {
final byte[] response = buildCredentialResponse(ecsAccessKey, ecsSessionToken).getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
return;
}
delegate.handle(exchange);
};
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length < 6) {
throw new IllegalArgumentException("S3HttpFixtureWithECS expects 6 arguments " +
"[address, port, bucket, base path, ecs access id, ecs session token]");
}
final S3HttpFixtureWithECS fixture = new S3HttpFixtureWithECS(args);
fixture.start();
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.s3;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.rest.RestStatus;
import java.util.Objects;
import static fixture.s3.S3HttpHandler.sendError;
public class S3HttpFixtureWithSessionToken extends S3HttpFixture {
S3HttpFixtureWithSessionToken(final String[] args) throws Exception {
super(args);
}
@Override
protected HttpHandler createHandler(final String[] args) {
final String sessionToken = Objects.requireNonNull(args[5], "session token is missing");
final HttpHandler delegate = super.createHandler(args);
return exchange -> {
final String securityToken = exchange.getRequestHeaders().getFirst("x-amz-security-token");
if (securityToken == null) {
sendError(exchange, RestStatus.FORBIDDEN, "AccessDenied", "No session token");
return;
}
if (securityToken.equals(sessionToken) == false) {
sendError(exchange, RestStatus.FORBIDDEN, "AccessDenied", "Bad session token");
return;
}
delegate.handle(exchange);
};
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length < 6) {
throw new IllegalArgumentException("S3HttpFixtureWithSessionToken expects 6 arguments " +
"[address, port, bucket, base path, access key, session token]");
}
final S3HttpFixtureWithSessionToken fixture = new S3HttpFixtureWithSessionToken(args);
fixture.start();
}
}

View File

@ -0,0 +1,368 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.s3;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Minimal HTTP handler that acts as a S3 compliant server
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public class S3HttpHandler implements HttpHandler {
private final String bucket;
private final String path;
private final ConcurrentMap<String, BytesReference> blobs = new ConcurrentHashMap<>();
public S3HttpHandler(final String bucket) {
this(bucket, null);
}
public S3HttpHandler(final String bucket, @Nullable final String basePath) {
this.bucket = Objects.requireNonNull(bucket);
this.path = bucket + (basePath != null && basePath.isEmpty() == false ? "/" + basePath : "");
}
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
try {
if (Regex.simpleMatch("POST /" + path + "/*?uploads", request)) {
final String uploadId = UUIDs.randomBase64UUID();
byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<InitiateMultipartUploadResult>\n" +
" <Bucket>" + bucket + "</Bucket>\n" +
" <Key>" + exchange.getRequestURI().getPath() + "</Key>\n" +
" <UploadId>" + uploadId + "</UploadId>\n" +
"</InitiateMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
blobs.put(multipartKey(uploadId, 0), BytesArray.EMPTY);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("PUT /" + path + "/*?uploadId=*&partNumber=*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String uploadId = params.get("uploadId");
if (blobs.containsKey(multipartKey(uploadId, 0))) {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
final int partNumber = Integer.parseInt(params.get("partNumber"));
blobs.put(multipartKey(uploadId, partNumber), blob.v2());
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
}
} else if (Regex.simpleMatch("POST /" + path + "/*?uploadId=*", request)) {
Streams.readFully(exchange.getRequestBody());
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String uploadId = params.get("uploadId");
final int nbParts = blobs.keySet().stream()
.filter(blobName -> blobName.startsWith(uploadId))
.map(blobName -> blobName.replaceFirst(uploadId + '\n', ""))
.mapToInt(Integer::parseInt)
.max()
.orElse(0);
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
for (int partNumber = 0; partNumber <= nbParts; partNumber++) {
BytesReference part = blobs.remove(multipartKey(uploadId, partNumber));
if (part == null) {
throw new AssertionError("Upload part is null");
}
part.writeTo(blob);
}
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<CompleteMultipartUploadResult>\n" +
" <Bucket>" + bucket + "</Bucket>\n" +
" <Key>" + exchange.getRequestURI().getPath() + "</Key>\n" +
"</CompleteMultipartUploadResult>").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("PUT /" + path + "/*", request)) {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
blobs.put(exchange.getRequestURI().toString(), blob.v2());
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
} else if (Regex.simpleMatch("GET /" + bucket + "/?prefix=*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
if (params.get("list-type") != null) {
throw new AssertionError("Test must be adapted for GET Bucket (List Objects) Version 2");
}
final StringBuilder list = new StringBuilder();
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
list.append("<ListBucketResult>");
final String prefix = params.get("prefix");
if (prefix != null) {
list.append("<Prefix>").append(prefix).append("</Prefix>");
}
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) {
list.append("<Contents>");
list.append("<Key>").append(blob.getKey().replace("/" + bucket + "/", "")).append("</Key>");
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
list.append("</Contents>");
}
}
list.append("</ListBucketResult>");
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("GET /" + path + "/*", request)) {
final BytesReference blob = blobs.get(exchange.getRequestURI().toString());
if (blob != null) {
final String range = exchange.getRequestHeaders().getFirst("Range");
if (range == null) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
blob.writeTo(exchange.getResponseBody());
} else {
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range);
if (matcher.matches() == false) {
throw new AssertionError("Bytes range does not match expected pattern: " + range);
}
final int start = Integer.parseInt(matcher.group(1));
final int end = Integer.parseInt(matcher.group(2));
final int length = end - start;
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("Content-Range",
String.format(Locale.ROOT, "bytes=%d-%d/%d", start, end, blob.length()));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
exchange.getResponseBody().write(BytesReference.toBytes(blob), start, length);
}
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
}
} else if (Regex.simpleMatch("DELETE /" + path + "/*", request)) {
int deletions = 0;
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesReference> blob = iterator.next();
if (blob.getKey().startsWith(exchange.getRequestURI().toString())) {
iterator.remove();
deletions++;
}
}
exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);
} else if (Regex.simpleMatch("POST /" + bucket + "/?delete", request)) {
final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
final StringBuilder deletes = new StringBuilder();
deletes.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
deletes.append("<DeleteResult>");
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesReference> blob = iterator.next();
String key = blob.getKey().replace("/" + path + "/", "");
if (requestBody.contains("<Key>" + key + "</Key>")) {
deletes.append("<Deleted><Key>").append(key).append("</Key></Deleted>");
iterator.remove();
}
}
deletes.append("</DeleteResult>");
byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} finally {
exchange.close();
}
}
private static String multipartKey(final String uploadId, int partNumber) {
return uploadId + "\n" + partNumber;
}
private static CheckedInputStream createCheckedInputStream(final InputStream inputStream, final MessageDigest digest) {
return new CheckedInputStream(inputStream, new Checksum() {
@Override
public void update(int b) {
digest.update((byte) b);
}
@Override
public void update(byte[] b, int off, int len) {
digest.update(b, off, len);
}
@Override
public long getValue() {
throw new UnsupportedOperationException();
}
@Override
public void reset() {
digest.reset();
}
});
}
private static final Pattern chunkSignaturePattern = Pattern.compile("^([0-9a-z]+);chunk-signature=([^\\r\\n]*)$");
private static Tuple<String, BytesReference> parseRequestBody(final HttpExchange exchange) throws IOException {
final BytesReference bytesReference;
final String headerDecodedContentLength = exchange.getRequestHeaders().getFirst("x-amz-decoded-content-length");
if (headerDecodedContentLength == null) {
bytesReference = Streams.readFully(exchange.getRequestBody());
} else {
BytesReference cc = Streams.readFully(exchange.getRequestBody());
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
try (BufferedInputStream in = new BufferedInputStream(cc.streamInput())) {
int chunkSize = 0;
int read;
while ((read = in.read()) != -1) {
boolean markAndContinue = false;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
do { // search next consecutive {carriage return, new line} chars and stop
if ((char) read == '\r') {
int next = in.read();
if (next != -1) {
if (next == '\n') {
break;
}
out.write(read);
out.write(next);
continue;
}
}
out.write(read);
} while ((read = in.read()) != -1);
final String line = new String(out.toByteArray(), UTF_8);
if (line.length() == 0 || line.equals("\r\n")) {
markAndContinue = true;
} else {
Matcher matcher = chunkSignaturePattern.matcher(line);
if (matcher.find()) {
markAndContinue = true;
chunkSize = Integer.parseUnsignedInt(matcher.group(1), 16);
}
}
if (markAndContinue) {
in.mark(Integer.MAX_VALUE);
continue;
}
}
if (chunkSize > 0) {
in.reset();
final byte[] buffer = new byte[chunkSize];
in.read(buffer, 0, buffer.length);
blob.write(buffer);
blob.flush();
chunkSize = 0;
}
}
}
if (blob.size() != Integer.parseInt(headerDecodedContentLength)) {
throw new IllegalStateException("Something went wrong when parsing the chunked request " +
"[bytes read=" + blob.size() + ", expected=" + headerDecodedContentLength + "]");
}
bytesReference = new BytesArray(blob.toByteArray());
}
final MessageDigest digest = MessageDigests.md5();
Streams.readFully(createCheckedInputStream(bytesReference.streamInput(), digest));
return Tuple.tuple(MessageDigests.toHexString(digest.digest()), bytesReference);
}
public static void sendError(final HttpExchange exchange,
final RestStatus status,
final String errorCode,
final String message) throws IOException {
final Headers headers = exchange.getResponseHeaders();
headers.add("Content-Type", "application/xml");
final String requestId = exchange.getRequestHeaders().getFirst("x-amz-request-id");
if (requestId != null) {
headers.add("x-amz-request-id", requestId);
}
if (errorCode == null || "HEAD".equals(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(status.getStatus(), -1L);
exchange.close();
} else {
final byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Error>" +
"<Code>" + errorCode + "</Code>" +
"<Message>" + message + "</Message>"
+ "<RequestId>" + requestId + "</RequestId>"
+ "</Error>").getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(status.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
}
}
}