diff --git a/plugins/repository-s3/build.gradle b/plugins/repository-s3/build.gradle index d4f1c07fd48..6845cb50c75 100644 --- a/plugins/repository-s3/build.gradle +++ b/plugins/repository-s3/build.gradle @@ -1,6 +1,5 @@ import org.elasticsearch.gradle.MavenFilteringHack import org.elasticsearch.gradle.info.BuildParams -import org.elasticsearch.gradle.test.AntFixture import org.elasticsearch.gradle.test.RestIntegTestTask import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE @@ -23,6 +22,7 @@ import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE * specific language governing permissions and limitations * under the License. */ +apply plugin: 'elasticsearch.test.fixtures' esplugin { description 'The S3 repository plugin adds S3 repositories' @@ -52,7 +52,7 @@ dependencies { // and whitelist this hack in JarHell compile 'javax.xml.bind:jaxb-api:2.2.2' - testCompile project(':test:fixtures:minio-fixture') + testCompile project(':test:fixtures:s3-fixture') } dependencyLicenses { @@ -111,7 +111,7 @@ if (!s3PermanentAccessKey && !s3PermanentSecretKey && !s3PermanentBucket && !s3P s3PermanentAccessKey = 'access_key' s3PermanentSecretKey = 'secret_key' s3PermanentBucket = 'bucket' - s3PermanentBasePath = '' + s3PermanentBasePath = 'base_path' useFixture = true @@ -120,21 +120,21 @@ if (!s3PermanentAccessKey && !s3PermanentSecretKey && !s3PermanentBucket && !s3P } if (!s3TemporaryAccessKey && !s3TemporarySecretKey && !s3TemporaryBucket && !s3TemporaryBasePath && !s3TemporarySessionToken) { - s3TemporaryAccessKey = 's3_integration_test_temporary_access_key' - s3TemporarySecretKey = 's3_integration_test_temporary_secret_key' - s3TemporaryBucket = 'temporary-bucket-test' - s3TemporaryBasePath = 'integration_test' - s3TemporarySessionToken = 's3_integration_test_temporary_session_token' + s3TemporaryAccessKey = 'session_token_access_key' + s3TemporarySecretKey = 'session_token_secret_key' + s3TemporaryBucket = 'session_token_bucket' + s3TemporaryBasePath = 'session_token_base_path' + s3TemporarySessionToken = 'session_token' } else if (!s3TemporaryAccessKey || !s3TemporarySecretKey || !s3TemporaryBucket || !s3TemporaryBasePath || !s3TemporarySessionToken) { throw new IllegalArgumentException("not all options specified to run against external S3 service as temporary credentials are present") } if (!s3EC2Bucket && !s3EC2BasePath && !s3ECSBucket && !s3ECSBasePath) { - s3EC2Bucket = 'ec2-bucket-test' - s3EC2BasePath = 'integration_test' - s3ECSBucket = 'ecs-bucket-test' - s3ECSBasePath = 'integration_test' + s3EC2Bucket = 'ec2_bucket' + s3EC2BasePath = 'ec2_base_path' + s3ECSBucket = 'ecs_bucket' + s3ECSBasePath = 'ecs_base_path' } else if (!s3EC2Bucket || !s3EC2BasePath || !s3ECSBucket || !s3ECSBasePath) { throw new IllegalArgumentException("not all options specified to run EC2/ECS tests are present") } @@ -148,8 +148,6 @@ task thirdPartyTest(type: Test) { } if (useFixture) { - apply plugin: 'elasticsearch.test.fixtures' - testFixtures.useFixture(':test:fixtures:minio-fixture') def minioAddress = { @@ -207,39 +205,6 @@ if (useFixture) { check.dependsOn(thirdPartyTest) -File parentFixtures = new File(project.buildDir, "fixtures") -File s3FixtureFile = new File(parentFixtures, 's3Fixture.properties') - -task s3FixtureProperties { - outputs.file(s3FixtureFile) - def s3FixtureOptions = [ - "tests.seed": BuildParams.testSeed, - "s3Fixture.permanent_bucket_name": s3PermanentBucket, - "s3Fixture.permanent_key": s3PermanentAccessKey, - "s3Fixture.temporary_bucket_name": s3TemporaryBucket, - "s3Fixture.temporary_key": s3TemporaryAccessKey, - "s3Fixture.temporary_session_token": s3TemporarySessionToken, - "s3Fixture.ec2_bucket_name": s3EC2Bucket, - "s3Fixture.ecs_bucket_name": s3ECSBucket, - "s3Fixture.disableChunkedEncoding": s3DisableChunkedEncoding - ] - - doLast { - file(s3FixtureFile).text = s3FixtureOptions.collect { k, v -> "$k = $v" }.join("\n") - } -} - -/** A task to start the AmazonS3Fixture which emulates an S3 service **/ -task s3Fixture(type: AntFixture) { - dependsOn testClasses - dependsOn s3FixtureProperties - inputs.file(s3FixtureFile) - - env 'CLASSPATH', "${-> project.sourceSets.test.runtimeClasspath.asPath}" - executable = "${BuildParams.runtimeJavaHome}/bin/java" - args 'org.elasticsearch.repositories.s3.AmazonS3Fixture', baseDir, s3FixtureFile.getAbsolutePath() -} - processTestResources { Map expansions = [ 'permanent_bucket': s3PermanentBucket, @@ -256,8 +221,13 @@ processTestResources { MavenFilteringHack.filter(it, expansions) } -integTest { - dependsOn s3Fixture +testFixtures.useFixture(':test:fixtures:s3-fixture') + +def fixtureAddress = { fixture -> + assert useFixture: 'closure should not be used without a fixture' + int ephemeralPort = project(':test:fixtures:s3-fixture').postProcessFixture.ext."test.fixtures.${fixture}.tcp.80" + assert ephemeralPort > 0 + 'http://127.0.0.1:' + ephemeralPort } testClusters.integTest { @@ -269,12 +239,12 @@ testClusters.integTest { keystore 's3.client.integration_test_temporary.session_token', s3TemporarySessionToken if (useFixture) { - setting 's3.client.integration_test_permanent.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE - setting 's3.client.integration_test_temporary.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE - setting 's3.client.integration_test_ec2.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE + setting 's3.client.integration_test_permanent.endpoint', { "${-> fixtureAddress('s3-fixture')}" }, IGNORE_VALUE + setting 's3.client.integration_test_temporary.endpoint', { "${-> fixtureAddress('s3-fixture-with-session-token')}" }, IGNORE_VALUE + setting 's3.client.integration_test_ec2.endpoint', { "${-> fixtureAddress('s3-fixture-with-ec2')}" }, IGNORE_VALUE // to redirect InstanceProfileCredentialsProvider to custom auth point - systemProperty "com.amazonaws.sdk.ec2MetadataServiceEndpointOverride", { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE + systemProperty "com.amazonaws.sdk.ec2MetadataServiceEndpointOverride", { "${-> fixtureAddress('s3-fixture-with-ec2')}" }, IGNORE_VALUE } else { println "Using an external service to test the repository-s3 plugin" } @@ -287,7 +257,7 @@ task s3ThirdPartyTests { if (useFixture) { task integTestECS(type: RestIntegTestTask.class) { description = "Runs tests using the ECS repository." - dependsOn(project.s3Fixture) + dependsOn('bundlePlugin') runner { systemProperty 'tests.rest.blacklist', [ 'repository_s3/10_basic/*', @@ -300,9 +270,9 @@ if (useFixture) { check.dependsOn(integTestECS) testClusters.integTestECS { - setting 's3.client.integration_test_ecs.endpoint', { "http://${s3Fixture.addressAndPort}" }, IGNORE_VALUE + setting 's3.client.integration_test_ecs.endpoint', { "${-> fixtureAddress('s3-fixture-with-ecs')}" }, IGNORE_VALUE plugin file(tasks.bundlePlugin.archiveFile) - environment 'AWS_CONTAINER_CREDENTIALS_FULL_URI', { "http://${s3Fixture.addressAndPort}/ecs_credentials_endpoint" }, IGNORE_VALUE + environment 'AWS_CONTAINER_CREDENTIALS_FULL_URI', { "${-> fixtureAddress('s3-fixture-with-ecs')}/ecs_credentials_endpoint" }, IGNORE_VALUE } gradle.taskGraph.whenReady { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java deleted file mode 100644 index e0434d1e50f..00000000000 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java +++ /dev/null @@ -1,615 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.repositories.s3; - -import org.apache.http.client.methods.HttpDelete; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpHead; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpPut; -import org.elasticsearch.common.TriFunction; -import org.elasticsearch.common.io.PathUtils; -import org.elasticsearch.test.fixture.AbstractHttpFixture; -import com.amazonaws.util.DateUtils; -import com.amazonaws.util.IOUtils; - -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.path.PathTrie; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; - -import java.io.BufferedInputStream; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Random; -import java.util.concurrent.TimeUnit; - -import static com.carrotsearch.randomizedtesting.generators.RandomStrings.randomAsciiAlphanumOfLength; -import static com.carrotsearch.randomizedtesting.generators.RandomStrings.randomAsciiAlphanumOfLengthBetween; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; - -/** - * {@link AmazonS3Fixture} emulates an AWS S3 service - * . - * he implementation is based on official documentation available at https://docs.aws.amazon.com/AmazonS3/latest/API/. - */ -public class AmazonS3Fixture extends AbstractHttpFixture { - private static final String AUTH = "AUTH"; - private static final String NON_AUTH = "NON_AUTH"; - - private static final String EC2_PROFILE = "ec2Profile"; - - private final Properties properties; - private final Random random; - - /** List of the buckets stored on this test server **/ - private final Map buckets = ConcurrentCollections.newConcurrentMap(); - - /** Request handlers for the requests made by the S3 client **/ - private final PathTrie handlers; - - private final boolean disableChunkedEncoding; - /** - * Creates a {@link AmazonS3Fixture} - */ - private AmazonS3Fixture(final String workingDir, Properties properties) { - super(workingDir); - this.properties = properties; - this.random = new Random(Long.parseUnsignedLong(requireNonNull(properties.getProperty("tests.seed")), 16)); - - new Bucket("s3Fixture.permanent", false); - new Bucket("s3Fixture.temporary", true); - final Bucket ec2Bucket = new Bucket("s3Fixture.ec2", - randomAsciiAlphanumOfLength(random, 10), randomAsciiAlphanumOfLength(random, 10)); - - final Bucket ecsBucket = new Bucket("s3Fixture.ecs", - randomAsciiAlphanumOfLength(random, 10), randomAsciiAlphanumOfLength(random, 10)); - - this.handlers = defaultHandlers(buckets, ec2Bucket, ecsBucket); - - this.disableChunkedEncoding = Boolean.parseBoolean(prop(properties, "s3Fixture.disableChunkedEncoding")); - } - - private static String nonAuthPath(Request request) { - return nonAuthPath(request.getMethod(), request.getPath()); - } - - private static String nonAuthPath(String method, String path) { - return NON_AUTH + " " + method + " " + path; - } - - private static String authPath(Request request) { - return authPath(request.getMethod(), request.getPath()); - } - - private static String authPath(String method, String path) { - return AUTH + " " + method + " " + path; - } - - @Override - protected Response handle(final Request request) throws IOException { - final String nonAuthorizedPath = nonAuthPath(request); - final RequestHandler nonAuthorizedHandler = handlers.retrieve(nonAuthorizedPath, request.getParameters()); - if (nonAuthorizedHandler != null) { - return nonAuthorizedHandler.handle(request); - } - - final String authorizedPath = authPath(request); - final RequestHandler handler = handlers.retrieve(authorizedPath, request.getParameters()); - if (handler != null) { - final String bucketName = request.getParam("bucket"); - if (bucketName == null) { - return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Bad access key", ""); - } - final Bucket bucket = buckets.get(bucketName); - if (bucket == null) { - return newBucketNotFoundError(request.getId(), bucketName); - } - final Response authResponse = authenticateBucket(request, bucket); - if (authResponse != null) { - return authResponse; - } - - return handler.handle(request); - - } else { - return newInternalError(request.getId(), "No handler defined for request [" + request + "]"); - } - } - - private Response authenticateBucket(Request request, Bucket bucket) { - final String authorization = request.getHeader("Authorization"); - if (authorization == null) { - return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Bad access key", ""); - } - if (authorization.contains(bucket.key)) { - final String sessionToken = request.getHeader("x-amz-security-token"); - if (bucket.token == null) { - if (sessionToken != null) { - return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Unexpected session token", ""); - } - } else { - if (sessionToken == null) { - return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "No session token", ""); - } - if (sessionToken.equals(bucket.token) == false) { - return newError(request.getId(), RestStatus.FORBIDDEN, "AccessDenied", "Bad session token", ""); - } - } - } - return null; - } - - public static void main(final String[] args) throws Exception { - if (args == null || args.length != 2) { - throw new IllegalArgumentException("AmazonS3Fixture "); - } - final Properties properties = new Properties(); - try (InputStream is = Files.newInputStream(PathUtils.get(args[1]))) { - properties.load(is); - } - final AmazonS3Fixture fixture = new AmazonS3Fixture(args[0], properties); - fixture.listen(); - } - - /** Builds the default request handlers **/ - private PathTrie defaultHandlers(final Map buckets, final Bucket ec2Bucket, final Bucket ecsBucket) { - final PathTrie handlers = new PathTrie<>(RestUtils.REST_DECODER); - - // HEAD Object - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectHEAD.html - objectsPaths(authPath(HttpHead.METHOD_NAME, "/{bucket}")).forEach(path -> - handlers.insert(path, (request) -> { - final String bucketName = request.getParam("bucket"); - - final Bucket bucket = buckets.get(bucketName); - if (bucket == null) { - return newBucketNotFoundError(request.getId(), bucketName); - } - - final String objectName = objectName(request.getParameters()); - for (Map.Entry object : bucket.objects.entrySet()) { - if (object.getKey().equals(objectName)) { - return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - } - } - return newObjectNotFoundError(request.getId(), objectName); - }) - ); - - // PUT Object - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html - objectsPaths(authPath(HttpPut.METHOD_NAME, "/{bucket}")).forEach(path -> - handlers.insert(path, (request) -> { - final String destBucketName = request.getParam("bucket"); - - final Bucket destBucket = buckets.get(destBucketName); - if (destBucket == null) { - return newBucketNotFoundError(request.getId(), destBucketName); - } - - final String destObjectName = objectName(request.getParameters()); - - String headerDecodedContentLength = request.getHeader("X-amz-decoded-content-length"); - if (headerDecodedContentLength != null) { - if (disableChunkedEncoding) { - return newInternalError(request.getId(), "Something is wrong with this PUT request"); - } - // This is a chunked upload request. We should have the header "Content-Encoding : aws-chunked,gzip" - // to detect it but it seems that the AWS SDK does not follow the S3 guidelines here. - // - // See https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html - // - int contentLength = Integer.valueOf(headerDecodedContentLength); - - // Chunked requests have a payload like this: - // - // 105;chunk-signature=01d0de6be013115a7f4794db8c4b9414e6ec71262cc33ae562a71f2eaed1efe8 - // ... bytes of data .... - // 0;chunk-signature=f890420b1974c5469aaf2112e9e6f2e0334929fd45909e03c0eff7a84124f6a4 - // - try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(request.getBody()))) { - int b; - // Moves to the end of the first signature line - while ((b = inputStream.read()) != -1) { - if (b == '\n') { - break; - } - } - - final byte[] bytes = new byte[contentLength]; - inputStream.read(bytes, 0, contentLength); - - destBucket.objects.put(destObjectName, bytes); - return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - } - } else { - if (disableChunkedEncoding == false) { - return newInternalError(request.getId(), "Something is wrong with this PUT request"); - } - // Read from body directly - try (BufferedInputStream inputStream = new BufferedInputStream(new ByteArrayInputStream(request.getBody()))) { - byte[] bytes = IOUtils.toByteArray(inputStream); - - destBucket.objects.put(destObjectName, bytes); - return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - } - } - }) - ); - - // DELETE Object - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html - objectsPaths(authPath(HttpDelete.METHOD_NAME, "/{bucket}")).forEach(path -> - handlers.insert(path, (request) -> { - final String bucketName = request.getParam("bucket"); - - final Bucket bucket = buckets.get(bucketName); - if (bucket == null) { - return newBucketNotFoundError(request.getId(), bucketName); - } - - final String objectName = objectName(request.getParameters()); - bucket.objects.remove(objectName); - return new Response(RestStatus.NO_CONTENT.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - }) - ); - - // GET Object - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html - objectsPaths(authPath(HttpGet.METHOD_NAME, "/{bucket}")).forEach(path -> - handlers.insert(path, (request) -> { - final String bucketName = request.getParam("bucket"); - - final Bucket bucket = buckets.get(bucketName); - if (bucket == null) { - return newBucketNotFoundError(request.getId(), bucketName); - } - - final String objectName = objectName(request.getParameters()); - if (bucket.objects.containsKey(objectName)) { - return new Response(RestStatus.OK.getStatus(), contentType("application/octet-stream"), bucket.objects.get(objectName)); - - } - return newObjectNotFoundError(request.getId(), objectName); - }) - ); - - // HEAD Bucket - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketHEAD.html - handlers.insert(authPath(HttpHead.METHOD_NAME, "/{bucket}"), (request) -> { - String bucket = request.getParam("bucket"); - if (Strings.hasText(bucket) && buckets.containsKey(bucket)) { - return new Response(RestStatus.OK.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - } else { - return newBucketNotFoundError(request.getId(), bucket); - } - }); - - // GET Bucket (List Objects) Version 1 - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/RESTBucketGET.html - handlers.insert(authPath(HttpGet.METHOD_NAME, "/{bucket}/"), (request) -> { - final String bucketName = request.getParam("bucket"); - - final Bucket bucket = buckets.get(bucketName); - if (bucket == null) { - return newBucketNotFoundError(request.getId(), bucketName); - } - - String prefix = request.getParam("prefix"); - if (prefix == null) { - prefix = request.getHeader("Prefix"); - } - return newListBucketResultResponse(request.getId(), bucket, prefix); - }); - - // Delete Multiple Objects - // - // https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html - final RequestHandler bulkDeleteHandler = request -> { - final List deletes = new ArrayList<>(); - final List errors = new ArrayList<>(); - - if (request.getParam("delete") != null) { - // The request body is something like: - // ...... - String requestBody = Streams.copyToString(new InputStreamReader(new ByteArrayInputStream(request.getBody()), UTF_8)); - if (requestBody.startsWith("")) { - final String startMarker = ""; - final String endMarker = ""; - - int offset = 0; - while (offset != -1) { - offset = requestBody.indexOf(startMarker, offset); - if (offset > 0) { - int closingOffset = requestBody.indexOf(endMarker, offset); - if (closingOffset != -1) { - offset = offset + startMarker.length(); - final String objectName = requestBody.substring(offset, closingOffset); - boolean found = false; - for (Bucket bucket : buckets.values()) { - if (bucket.objects.containsKey(objectName)) { - final Response authResponse = authenticateBucket(request, bucket); - if (authResponse != null) { - return authResponse; - } - bucket.objects.remove(objectName); - found = true; - } - } - - if (found) { - deletes.add(objectName); - } else { - errors.add(objectName); - } - } - } - } - return newDeleteResultResponse(request.getId(), deletes, errors); - } - } - return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request"); - }; - handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler); - handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler); - - // non-authorized requests - - TriFunction credentialResponseFunction = (profileName, key, token) -> { - final Date expiration = new Date(new Date().getTime() + TimeUnit.DAYS.toMillis(1)); - final String response = "{" - + "\"AccessKeyId\": \"" + key + "\"," - + "\"Expiration\": \"" + DateUtils.formatISO8601Date(expiration) + "\"," - + "\"RoleArn\": \"" + randomAsciiAlphanumOfLengthBetween(random, 1, 20) + "\"," - + "\"SecretAccessKey\": \"" + randomAsciiAlphanumOfLengthBetween(random, 1, 20) + "\"," - + "\"Token\": \"" + token + "\"" - + "}"; - - final Map headers = new HashMap<>(contentType("application/json")); - return new Response(RestStatus.OK.getStatus(), headers, response.getBytes(UTF_8)); - }; - - // GET - // - // http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html - handlers.insert(nonAuthPath(HttpGet.METHOD_NAME, "/latest/meta-data/iam/security-credentials/"), (request) -> { - final String response = EC2_PROFILE; - - final Map headers = new HashMap<>(contentType("text/plain")); - return new Response(RestStatus.OK.getStatus(), headers, response.getBytes(UTF_8)); - }); - - // GET - // - // http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html - handlers.insert(nonAuthPath(HttpGet.METHOD_NAME, "/latest/meta-data/iam/security-credentials/{profileName}"), (request) -> { - final String profileName = request.getParam("profileName"); - if (EC2_PROFILE.equals(profileName) == false) { - return new Response(RestStatus.NOT_FOUND.getStatus(), new HashMap<>(), "unknown profile".getBytes(UTF_8)); - } - return credentialResponseFunction.apply(profileName, ec2Bucket.key, ec2Bucket.token); - }); - - // GET - // - // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html - handlers.insert(nonAuthPath(HttpGet.METHOD_NAME, "/ecs_credentials_endpoint"), - (request) -> credentialResponseFunction.apply("CPV_ECS", ecsBucket.key, ecsBucket.token)); - - - return handlers; - } - - private static String prop(Properties properties, String propertyName) { - return requireNonNull(properties.getProperty(propertyName), - "property '" + propertyName + "' is missing"); - } - - /** - * Represents a S3 bucket. - */ - class Bucket { - - /** Bucket name **/ - final String name; - - final String key; - - final String token; - - /** Blobs contained in the bucket **/ - final Map objects; - - private Bucket(final String prefix, final boolean tokenRequired) { - this(prefix, prop(properties, prefix + "_key"), - tokenRequired ? prop(properties, prefix + "_session_token") : null); - } - - private Bucket(final String prefix, final String key, final String token) { - this.name = prop(properties, prefix + "_bucket_name"); - this.key = key; - this.token = token; - - this.objects = ConcurrentCollections.newConcurrentMap(); - if (buckets.put(name, this) != null) { - throw new IllegalArgumentException("bucket " + name + " is already registered"); - } - } - } - - /** - * Decline a path like "http://host:port/{bucket}" into 10 derived paths like: - * - http://host:port/{bucket}/{path0} - * - http://host:port/{bucket}/{path0}/{path1} - * - http://host:port/{bucket}/{path0}/{path1}/{path2} - * - etc - */ - private static List objectsPaths(final String path) { - final List paths = new ArrayList<>(); - String p = path; - for (int i = 0; i < 10; i++) { - p = p + "/{path" + i + "}"; - paths.add(p); - } - return paths; - } - - /** - * Retrieves the object name from all derives paths named {pathX} where 0 <= X < 10. - * - * This is the counterpart of {@link #objectsPaths(String)} - */ - private static String objectName(final Map params) { - final StringBuilder name = new StringBuilder(); - for (int i = 0; i < 10; i++) { - String value = params.getOrDefault("path" + i, null); - if (value != null) { - if (name.length() > 0) { - name.append('/'); - } - name.append(value); - } - } - return name.toString(); - } - - /** - * S3 ListBucketResult Response - */ - private static Response newListBucketResultResponse(final long requestId, final Bucket bucket, final String prefix) { - final String id = Long.toString(requestId); - final StringBuilder response = new StringBuilder(); - response.append(""); - response.append(""); - response.append(""); - if (prefix != null) { - response.append(prefix); - } - response.append(""); - response.append(""); - response.append("1000"); - response.append("false"); - - int count = 0; - for (Map.Entry object : bucket.objects.entrySet()) { - String objectName = object.getKey(); - if (prefix == null || objectName.startsWith(prefix)) { - response.append(""); - response.append("").append(objectName).append(""); - response.append("").append(DateUtils.formatISO8601Date(new Date())).append(""); - response.append(""").append(count++).append("""); - response.append("").append(object.getValue().length).append(""); - response.append(""); - } - } - response.append(""); - - final Map headers = new HashMap<>(contentType("application/xml")); - headers.put("x-amz-request-id", id); - - return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8)); - } - - /** - * S3 DeleteResult Response - */ - private static Response newDeleteResultResponse(final long requestId, - final List deletedObjects, - final List ignoredObjects) { - final String id = Long.toString(requestId); - - final StringBuilder response = new StringBuilder(); - response.append(""); - response.append(""); - for (String deletedObject : deletedObjects) { - response.append(""); - response.append("").append(deletedObject).append(""); - response.append(""); - } - for (String ignoredObject : ignoredObjects) { - response.append(""); - response.append("").append(ignoredObject).append(""); - response.append("NoSuchKey"); - response.append(""); - } - response.append(""); - - final Map headers = new HashMap<>(contentType("application/xml")); - headers.put("x-amz-request-id", id); - - return new Response(RestStatus.OK.getStatus(), headers, response.toString().getBytes(UTF_8)); - } - - private static Response newBucketNotFoundError(final long requestId, final String bucket) { - return newError(requestId, RestStatus.NOT_FOUND, "NoSuchBucket", "The specified bucket does not exist", bucket); - } - - private static Response newObjectNotFoundError(final long requestId, final String object) { - return newError(requestId, RestStatus.NOT_FOUND, "NoSuchKey", "The specified key does not exist", object); - } - - private static Response newInternalError(final long requestId, final String resource) { - return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "We encountered an internal error", resource); - } - - /** - * S3 Error - * - * https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html - */ - private static Response newError(final long requestId, - final RestStatus status, - final String code, - final String message, - final String resource) { - final String id = Long.toString(requestId); - final StringBuilder response = new StringBuilder(); - response.append(""); - response.append(""); - response.append("").append(code).append(""); - response.append("").append(message).append(""); - response.append("").append(resource).append(""); - response.append("").append(id).append(""); - response.append(""); - - final Map headers = new HashMap<>(contentType("application/xml")); - headers.put("x-amz-request-id", id); - - return new Response(status.getStatus(), headers, response.toString().getBytes(UTF_8)); - } -} diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 69b9c453480..5fb1ec8e3eb 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -19,20 +19,14 @@ package org.elasticsearch.repositories.s3; import com.amazonaws.http.AmazonHttpClient; -import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream; -import com.amazonaws.util.Base16; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import fixture.s3.S3HttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -40,27 +34,14 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; import org.elasticsearch.threadpool.ThreadPool; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.hamcrest.Matchers.nullValue; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @@ -86,7 +67,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/bucket", new InternalHttpHandler()); + return Collections.singletonMap("/bucket", new S3HttpHandler("bucket")); } @Override @@ -153,162 +134,6 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes } } - /** - * Minimal HTTP handler that acts as a S3 compliant server - */ - @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") - private static class InternalHttpHandler implements HttpHandler { - - private final ConcurrentMap blobs = new ConcurrentHashMap<>(); - - @Override - public void handle(final HttpExchange exchange) throws IOException { - final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); - try { - if (Regex.simpleMatch("POST /bucket/*?uploads", request)) { - final String uploadId = UUIDs.randomBase64UUID(); - byte[] response = ("\n" + - "\n" + - " bucket\n" + - " " + exchange.getRequestURI().getPath() + "\n" + - " " + uploadId + "\n" + - "").getBytes(StandardCharsets.UTF_8); - blobs.put(multipartKey(uploadId, 0), BytesArray.EMPTY); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - } else if (Regex.simpleMatch("PUT /bucket/*?uploadId=*&partNumber=*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - - final String uploadId = params.get("uploadId"); - if (blobs.containsKey(multipartKey(uploadId, 0))) { - final int partNumber = Integer.parseInt(params.get("partNumber")); - MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); - blobs.put(multipartKey(uploadId, partNumber), Streams.readFully(md5)); - exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); - } else { - exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); - } - - } else if (Regex.simpleMatch("POST /bucket/*?uploadId=*", request)) { - drainInputStream(exchange.getRequestBody()); - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - final String uploadId = params.get("uploadId"); - - final int nbParts = blobs.keySet().stream() - .filter(blobName -> blobName.startsWith(uploadId)) - .map(blobName -> blobName.replaceFirst(uploadId + '\n', "")) - .mapToInt(Integer::parseInt) - .max() - .orElse(0); - - final ByteArrayOutputStream blob = new ByteArrayOutputStream(); - for (int partNumber = 0; partNumber <= nbParts; partNumber++) { - BytesReference part = blobs.remove(multipartKey(uploadId, partNumber)); - assertNotNull(part); - part.writeTo(blob); - } - blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray())); - - byte[] response = ("\n" + - "\n" + - " bucket\n" + - " " + exchange.getRequestURI().getPath() + "\n" + - "").getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - }else if (Regex.simpleMatch("PUT /bucket/*", request)) { - blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody())); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); - - } else if (Regex.simpleMatch("GET /bucket/?prefix=*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - assertThat("Test must be adapted for GET Bucket (List Objects) Version 2", params.get("list-type"), nullValue()); - - final StringBuilder list = new StringBuilder(); - list.append(""); - list.append(""); - final String prefix = params.get("prefix"); - if (prefix != null) { - list.append("").append(prefix).append(""); - } - for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/bucket/" + prefix)) { - list.append(""); - list.append("").append(blob.getKey().replace("/bucket/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append(""); - } - } - list.append(""); - - byte[] response = list.toString().getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - } else if (Regex.simpleMatch("GET /bucket/*", request)) { - final BytesReference blob = blobs.get(exchange.getRequestURI().toString()); - if (blob != null) { - exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length()); - blob.writeTo(exchange.getResponseBody()); - } else { - exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); - } - - } else if (Regex.simpleMatch("DELETE /bucket/*", request)) { - int deletions = 0; - for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry blob = iterator.next(); - if (blob.getKey().startsWith(exchange.getRequestURI().toString())) { - iterator.remove(); - deletions++; - } - } - exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1); - - } else if (Regex.simpleMatch("POST /bucket/?delete", request)) { - final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8)); - - final StringBuilder deletes = new StringBuilder(); - deletes.append(""); - deletes.append(""); - for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry blob = iterator.next(); - String key = blob.getKey().replace("/bucket/", ""); - if (requestBody.contains("" + key + "")) { - deletes.append("").append(key).append(""); - iterator.remove(); - } - } - deletes.append(""); - - byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/xml"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - } else { - exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1); - } - } finally { - exchange.close(); - } - } - - private static String multipartKey(final String uploadId, int partNumber) { - return uploadId + "\n" + partNumber; - } - } - /** * HTTP handler that injects random S3 service errors * diff --git a/settings.gradle b/settings.gradle index ccbf322cc91..b23ce78e0fa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -58,6 +58,7 @@ List projects = [ 'test:fixtures:krb5kdc-fixture', 'test:fixtures:minio-fixture', 'test:fixtures:old-elasticsearch', + 'test:fixtures:s3-fixture', 'test:logger-usage' ] diff --git a/test/fixtures/s3-fixture/Dockerfile b/test/fixtures/s3-fixture/Dockerfile new file mode 100644 index 00000000000..c1f2515ae35 --- /dev/null +++ b/test/fixtures/s3-fixture/Dockerfile @@ -0,0 +1,23 @@ +FROM ubuntu:19.04 + +RUN apt-get update -qqy +RUN apt-get install -qqy openjdk-12-jre-headless + +ARG fixtureClass +ARG port +ARG bucket +ARG basePath +ARG accessKey +ARG sessionToken + +ENV S3_FIXTURE_CLASS=${fixtureClass} +ENV S3_FIXTURE_PORT=${port} +ENV S3_FIXTURE_BUCKET=${bucket} +ENV S3_FIXTURE_BASE_PATH=${basePath} +ENV S3_FIXTURE_ACCESS_KEY=${accessKey} +ENV S3_FIXTURE_SESSION_TOKEN=${sessionToken} + +ENTRYPOINT exec java -classpath "/fixture/shared/*" \ + $S3_FIXTURE_CLASS 0.0.0.0 "$S3_FIXTURE_PORT" "$S3_FIXTURE_BUCKET" "$S3_FIXTURE_BASE_PATH" "$S3_FIXTURE_ACCESS_KEY" "$S3_FIXTURE_SESSION_TOKEN" + +EXPOSE $port diff --git a/test/fixtures/s3-fixture/build.gradle b/test/fixtures/s3-fixture/build.gradle new file mode 100644 index 00000000000..cf4f6f42bb2 --- /dev/null +++ b/test/fixtures/s3-fixture/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +apply plugin: 'elasticsearch.build' +apply plugin: 'elasticsearch.test.fixtures' + +description = 'Fixture for S3 Storage service' +test.enabled = false + +dependencies { + compile project(':server') +} + +preProcessFixture { + dependsOn jar + doLast { + file("${testFixturesDir}/shared").mkdirs() + project.copy { + from jar + from configurations.runtimeClasspath + into "${testFixturesDir}/shared" + } + } +} diff --git a/test/fixtures/s3-fixture/docker-compose.yml b/test/fixtures/s3-fixture/docker-compose.yml new file mode 100644 index 00000000000..401a43c9255 --- /dev/null +++ b/test/fixtures/s3-fixture/docker-compose.yml @@ -0,0 +1,64 @@ +version: '3' +services: + s3-fixture: + build: + context: . + args: + fixtureClass: fixture.s3.S3HttpFixture + port: 80 + bucket: "bucket" + basePath: "base_path_integration_tests" + accessKey: "access_key" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" + + s3-fixture-with-session-token: + build: + context: . + args: + fixtureClass: fixture.s3.S3HttpFixtureWithSessionToken + port: 80 + bucket: "session_token_bucket" + basePath: "session_token_base_path_integration_tests" + accessKey: "session_token_access_key" + sessionToken: "session_token" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" + + s3-fixture-with-ec2: + build: + context: . + args: + fixtureClass: fixture.s3.S3HttpFixtureWithEC2 + port: 80 + bucket: "ec2_bucket" + basePath: "ec2_base_path" + accessKey: "ec2_access_key" + sessionToken: "ec2_session_token" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" + + s3-fixture-with-ecs: + build: + context: . + args: + fixtureClass: fixture.s3.S3HttpFixtureWithECS + port: 80 + bucket: "ecs_bucket" + basePath: "ecs_base_path" + accessKey: "ecs_access_key" + sessionToken: "ecs_session_token" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java new file mode 100644 index 00000000000..b80e4a34e26 --- /dev/null +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixture.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.s3; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Objects; + +public class S3HttpFixture { + + private final HttpServer server; + + S3HttpFixture(final String[] args) throws Exception { + this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(args[0]), Integer.parseInt(args[1])), 0); + this.server.createContext("/", Objects.requireNonNull(createHandler(args))); + } + + final void start() throws Exception { + try { + server.start(); + // wait to be killed + Thread.sleep(Long.MAX_VALUE); + } finally { + server.stop(0); + } + } + + protected HttpHandler createHandler(final String[] args) { + final String bucket = Objects.requireNonNull(args[2]); + final String basePath = args[3]; + final String accessKey = Objects.requireNonNull(args[4]); + + return new S3HttpHandler(bucket, basePath) { + @Override + public void handle(final HttpExchange exchange) throws IOException { + final String authorization = exchange.getRequestHeaders().getFirst("Authorization"); + if (authorization == null || authorization.contains(accessKey) == false) { + sendError(exchange, RestStatus.FORBIDDEN, "AccessDenied", "Bad access key"); + return; + } + super.handle(exchange); + } + }; + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length < 5) { + throw new IllegalArgumentException("S3HttpFixture expects 5 arguments [address, port, bucket, base path, access key]"); + } + final S3HttpFixture fixture = new S3HttpFixture(args); + fixture.start(); + } +} diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithEC2.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithEC2.java new file mode 100644 index 00000000000..796b53fa5ae --- /dev/null +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithEC2.java @@ -0,0 +1,95 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.s3; + +import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.rest.RestStatus; + +import java.nio.charset.StandardCharsets; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Objects; + +public class S3HttpFixtureWithEC2 extends S3HttpFixtureWithSessionToken { + + private static final String EC2_PATH = "/latest/meta-data/iam/security-credentials/"; + private static final String EC2_PROFILE = "ec2Profile"; + + S3HttpFixtureWithEC2(final String[] args) throws Exception { + super(args); + } + + @Override + protected HttpHandler createHandler(final String[] args) { + final String ec2AccessKey = Objects.requireNonNull(args[4]); + final String ec2SessionToken = Objects.requireNonNull(args[5], "session token is missing"); + final HttpHandler delegate = super.createHandler(args); + + return exchange -> { + final String path = exchange.getRequestURI().getPath(); + // http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html + if ("GET".equals(exchange.getRequestMethod()) && path.startsWith(EC2_PATH)) { + if (path.equals(EC2_PATH)) { + final byte[] response = EC2_PROFILE.getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + + } else if (path.equals(EC2_PATH + EC2_PROFILE)) { + final byte[] response = buildCredentialResponse(ec2AccessKey, ec2SessionToken).getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } + + final byte[] response = "unknown profile".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "text/plain"); + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + + } + delegate.handle(exchange); + }; + } + + protected String buildCredentialResponse(final String ec2AccessKey, final String ec2SessionToken) { + return "{" + + "\"AccessKeyId\": \"" + ec2AccessKey + "\"," + + "\"Expiration\": \"" + ZonedDateTime.now().plusDays(1L).format(DateTimeFormatter.ISO_DATE_TIME) + "\"," + + "\"RoleArn\": \"arn\"," + + "\"SecretAccessKey\": \"secret\"," + + "\"Token\": \"" + ec2SessionToken + "\"" + + "}"; + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length < 6) { + throw new IllegalArgumentException("S3HttpFixtureWithEC2 expects 6 arguments " + + "[address, port, bucket, base path, ec2 access id, ec2 session token]"); + } + final S3HttpFixtureWithEC2 fixture = new S3HttpFixtureWithEC2(args); + fixture.start(); + } +} diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithECS.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithECS.java new file mode 100644 index 00000000000..d5afb0adab3 --- /dev/null +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithECS.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.s3; + +import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.rest.RestStatus; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +public class S3HttpFixtureWithECS extends S3HttpFixtureWithEC2 { + + private S3HttpFixtureWithECS(final String[] args) throws Exception { + super(args); + } + + @Override + protected HttpHandler createHandler(final String[] args) { + final String ecsAccessKey = Objects.requireNonNull(args[4]); + final String ecsSessionToken = Objects.requireNonNull(args[5], "session token is missing"); + final HttpHandler delegate = super.createHandler(args); + + return exchange -> { + // https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html + if ("GET".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getPath().equals("/ecs_credentials_endpoint")) { + final byte[] response = buildCredentialResponse(ecsAccessKey, ecsSessionToken).getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + exchange.close(); + return; + } + delegate.handle(exchange); + }; + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length < 6) { + throw new IllegalArgumentException("S3HttpFixtureWithECS expects 6 arguments " + + "[address, port, bucket, base path, ecs access id, ecs session token]"); + } + final S3HttpFixtureWithECS fixture = new S3HttpFixtureWithECS(args); + fixture.start(); + } +} diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithSessionToken.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithSessionToken.java new file mode 100644 index 00000000000..8ab64b7a4c0 --- /dev/null +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpFixtureWithSessionToken.java @@ -0,0 +1,60 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.s3; + +import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.rest.RestStatus; + +import java.util.Objects; + +import static fixture.s3.S3HttpHandler.sendError; + +public class S3HttpFixtureWithSessionToken extends S3HttpFixture { + + S3HttpFixtureWithSessionToken(final String[] args) throws Exception { + super(args); + } + + @Override + protected HttpHandler createHandler(final String[] args) { + final String sessionToken = Objects.requireNonNull(args[5], "session token is missing"); + final HttpHandler delegate = super.createHandler(args); + return exchange -> { + final String securityToken = exchange.getRequestHeaders().getFirst("x-amz-security-token"); + if (securityToken == null) { + sendError(exchange, RestStatus.FORBIDDEN, "AccessDenied", "No session token"); + return; + } + if (securityToken.equals(sessionToken) == false) { + sendError(exchange, RestStatus.FORBIDDEN, "AccessDenied", "Bad session token"); + return; + } + delegate.handle(exchange); + }; + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length < 6) { + throw new IllegalArgumentException("S3HttpFixtureWithSessionToken expects 6 arguments " + + "[address, port, bucket, base path, access key, session token]"); + } + final S3HttpFixtureWithSessionToken fixture = new S3HttpFixtureWithSessionToken(args); + fixture.start(); + } +} diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java new file mode 100644 index 00000000000..468035b828b --- /dev/null +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -0,0 +1,368 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.s3; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.hash.MessageDigests; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.RestUtils; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.CheckedInputStream; +import java.util.zip.Checksum; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Minimal HTTP handler that acts as a S3 compliant server + */ +@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") +public class S3HttpHandler implements HttpHandler { + + private final String bucket; + private final String path; + + private final ConcurrentMap blobs = new ConcurrentHashMap<>(); + + public S3HttpHandler(final String bucket) { + this(bucket, null); + } + + public S3HttpHandler(final String bucket, @Nullable final String basePath) { + this.bucket = Objects.requireNonNull(bucket); + this.path = bucket + (basePath != null && basePath.isEmpty() == false ? "/" + basePath : ""); + } + + @Override + public void handle(final HttpExchange exchange) throws IOException { + final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); + try { + if (Regex.simpleMatch("POST /" + path + "/*?uploads", request)) { + final String uploadId = UUIDs.randomBase64UUID(); + byte[] response = ("\n" + + "\n" + + " " + bucket + "\n" + + " " + exchange.getRequestURI().getPath() + "\n" + + " " + uploadId + "\n" + + "").getBytes(StandardCharsets.UTF_8); + blobs.put(multipartKey(uploadId, 0), BytesArray.EMPTY); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("PUT /" + path + "/*?uploadId=*&partNumber=*", request)) { + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + + final String uploadId = params.get("uploadId"); + if (blobs.containsKey(multipartKey(uploadId, 0))) { + final Tuple blob = parseRequestBody(exchange); + final int partNumber = Integer.parseInt(params.get("partNumber")); + blobs.put(multipartKey(uploadId, partNumber), blob.v2()); + exchange.getResponseHeaders().add("ETag", blob.v1()); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + } else { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } + + } else if (Regex.simpleMatch("POST /" + path + "/*?uploadId=*", request)) { + Streams.readFully(exchange.getRequestBody()); + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + final String uploadId = params.get("uploadId"); + + final int nbParts = blobs.keySet().stream() + .filter(blobName -> blobName.startsWith(uploadId)) + .map(blobName -> blobName.replaceFirst(uploadId + '\n', "")) + .mapToInt(Integer::parseInt) + .max() + .orElse(0); + + final ByteArrayOutputStream blob = new ByteArrayOutputStream(); + for (int partNumber = 0; partNumber <= nbParts; partNumber++) { + BytesReference part = blobs.remove(multipartKey(uploadId, partNumber)); + if (part == null) { + throw new AssertionError("Upload part is null"); + } + part.writeTo(blob); + } + blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray())); + + byte[] response = ("\n" + + "\n" + + " " + bucket + "\n" + + " " + exchange.getRequestURI().getPath() + "\n" + + "").getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("PUT /" + path + "/*", request)) { + final Tuple blob = parseRequestBody(exchange); + blobs.put(exchange.getRequestURI().toString(), blob.v2()); + exchange.getResponseHeaders().add("ETag", blob.v1()); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + + } else if (Regex.simpleMatch("GET /" + bucket + "/?prefix=*", request)) { + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + if (params.get("list-type") != null) { + throw new AssertionError("Test must be adapted for GET Bucket (List Objects) Version 2"); + } + + final StringBuilder list = new StringBuilder(); + list.append(""); + list.append(""); + final String prefix = params.get("prefix"); + if (prefix != null) { + list.append("").append(prefix).append(""); + } + for (Map.Entry blob : blobs.entrySet()) { + if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) { + list.append(""); + list.append("").append(blob.getKey().replace("/" + bucket + "/", "")).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append(""); + } + } + list.append(""); + + byte[] response = list.toString().getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("GET /" + path + "/*", request)) { + final BytesReference blob = blobs.get(exchange.getRequestURI().toString()); + if (blob != null) { + final String range = exchange.getRequestHeaders().getFirst("Range"); + if (range == null) { + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length()); + blob.writeTo(exchange.getResponseBody()); + } else { + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range); + if (matcher.matches() == false) { + throw new AssertionError("Bytes range does not match expected pattern: " + range); + } + + final int start = Integer.parseInt(matcher.group(1)); + final int end = Integer.parseInt(matcher.group(2)); + final int length = end - start; + + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.getResponseHeaders().add("Content-Range", + String.format(Locale.ROOT, "bytes=%d-%d/%d", start, end, blob.length())); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); + exchange.getResponseBody().write(BytesReference.toBytes(blob), start, length); + } + } else { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } + + } else if (Regex.simpleMatch("DELETE /" + path + "/*", request)) { + int deletions = 0; + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); + if (blob.getKey().startsWith(exchange.getRequestURI().toString())) { + iterator.remove(); + deletions++; + } + } + exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1); + + } else if (Regex.simpleMatch("POST /" + bucket + "/?delete", request)) { + final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8)); + + final StringBuilder deletes = new StringBuilder(); + deletes.append(""); + deletes.append(""); + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); + String key = blob.getKey().replace("/" + path + "/", ""); + if (requestBody.contains("" + key + "")) { + deletes.append("").append(key).append(""); + iterator.remove(); + } + } + deletes.append(""); + + byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/xml"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else { + exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1); + } + } finally { + exchange.close(); + } + } + + private static String multipartKey(final String uploadId, int partNumber) { + return uploadId + "\n" + partNumber; + } + + private static CheckedInputStream createCheckedInputStream(final InputStream inputStream, final MessageDigest digest) { + return new CheckedInputStream(inputStream, new Checksum() { + @Override + public void update(int b) { + digest.update((byte) b); + } + + @Override + public void update(byte[] b, int off, int len) { + digest.update(b, off, len); + } + + @Override + public long getValue() { + throw new UnsupportedOperationException(); + } + + @Override + public void reset() { + digest.reset(); + } + }); + } + + private static final Pattern chunkSignaturePattern = Pattern.compile("^([0-9a-z]+);chunk-signature=([^\\r\\n]*)$"); + + private static Tuple parseRequestBody(final HttpExchange exchange) throws IOException { + final BytesReference bytesReference; + + final String headerDecodedContentLength = exchange.getRequestHeaders().getFirst("x-amz-decoded-content-length"); + if (headerDecodedContentLength == null) { + bytesReference = Streams.readFully(exchange.getRequestBody()); + } else { + BytesReference cc = Streams.readFully(exchange.getRequestBody()); + + final ByteArrayOutputStream blob = new ByteArrayOutputStream(); + try (BufferedInputStream in = new BufferedInputStream(cc.streamInput())) { + int chunkSize = 0; + int read; + while ((read = in.read()) != -1) { + boolean markAndContinue = false; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + do { // search next consecutive {carriage return, new line} chars and stop + if ((char) read == '\r') { + int next = in.read(); + if (next != -1) { + if (next == '\n') { + break; + } + out.write(read); + out.write(next); + continue; + } + } + out.write(read); + } while ((read = in.read()) != -1); + + final String line = new String(out.toByteArray(), UTF_8); + if (line.length() == 0 || line.equals("\r\n")) { + markAndContinue = true; + } else { + Matcher matcher = chunkSignaturePattern.matcher(line); + if (matcher.find()) { + markAndContinue = true; + chunkSize = Integer.parseUnsignedInt(matcher.group(1), 16); + } + } + if (markAndContinue) { + in.mark(Integer.MAX_VALUE); + continue; + } + } + if (chunkSize > 0) { + in.reset(); + final byte[] buffer = new byte[chunkSize]; + in.read(buffer, 0, buffer.length); + blob.write(buffer); + blob.flush(); + chunkSize = 0; + } + } + } + if (blob.size() != Integer.parseInt(headerDecodedContentLength)) { + throw new IllegalStateException("Something went wrong when parsing the chunked request " + + "[bytes read=" + blob.size() + ", expected=" + headerDecodedContentLength + "]"); + } + bytesReference = new BytesArray(blob.toByteArray()); + } + + final MessageDigest digest = MessageDigests.md5(); + Streams.readFully(createCheckedInputStream(bytesReference.streamInput(), digest)); + return Tuple.tuple(MessageDigests.toHexString(digest.digest()), bytesReference); + } + + public static void sendError(final HttpExchange exchange, + final RestStatus status, + final String errorCode, + final String message) throws IOException { + final Headers headers = exchange.getResponseHeaders(); + headers.add("Content-Type", "application/xml"); + + final String requestId = exchange.getRequestHeaders().getFirst("x-amz-request-id"); + if (requestId != null) { + headers.add("x-amz-request-id", requestId); + } + + if (errorCode == null || "HEAD".equals(exchange.getRequestMethod())) { + exchange.sendResponseHeaders(status.getStatus(), -1L); + exchange.close(); + } else { + final byte[] response = ("" + + "" + errorCode + "" + + "" + message + "" + + "" + requestId + "" + + "").getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(status.getStatus(), response.length); + exchange.getResponseBody().write(response); + exchange.close(); + } + } +}