From 8a14ea5567599273f0a55b56a604707b13b4d7d4 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Thu, 7 Nov 2019 13:27:22 -0500 Subject: [PATCH] Add docker-composed based test fixture for GCS (#48902) Similarly to what has be done for Azure in #48636, this commit adds a new :test:fixtures:gcs-fixture project which provides two docker-compose based fixtures that emulate a Google Cloud Storage service. Some code has been extracted from existing tests and placed into this new project so that it can be easily reused in other projects. --- plugins/repository-gcs/build.gradle | 2 + .../qa/google-cloud-storage/build.gradle | 61 +- .../gcs/GoogleCloudStorageFixture.java | 656 ------------------ ...CloudStorageBlobContainerRetriesTests.java | 21 +- ...eCloudStorageBlobStoreRepositoryTests.java | 210 +----- .../repositories/gcs/TestUtils.java | 97 --- settings.gradle | 3 +- test/fixtures/gcs-fixture/Dockerfile | 17 + test/fixtures/gcs-fixture/build.gradle | 39 ++ test/fixtures/gcs-fixture/docker-compose.yml | 26 + .../fixture/gcs/FakeOAuth2HttpHandler.java | 41 ++ .../gcs/GoogleCloudStorageHttpFixture.java | 55 ++ .../gcs/GoogleCloudStorageHttpHandler.java | 340 +++++++++ 13 files changed, 559 insertions(+), 1009 deletions(-) delete mode 100644 plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java create mode 100644 test/fixtures/gcs-fixture/Dockerfile create mode 100644 test/fixtures/gcs-fixture/build.gradle create mode 100644 test/fixtures/gcs-fixture/docker-compose.yml create mode 100644 test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java create mode 100644 test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpFixture.java create mode 100644 test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java diff --git a/plugins/repository-gcs/build.gradle b/plugins/repository-gcs/build.gradle index 709b7740fb4..f75a1b66443 100644 --- a/plugins/repository-gcs/build.gradle +++ b/plugins/repository-gcs/build.gradle @@ -54,6 +54,8 @@ dependencies { compile 'io.opencensus:opencensus-api:0.18.0' compile 'io.opencensus:opencensus-contrib-http-util:0.18.0' compile 'com.google.apis:google-api-services-storage:v1-rev20190426-1.28.0' + + testCompile project(':test:fixtures:gcs-fixture') } dependencyLicenses { diff --git a/plugins/repository-gcs/qa/google-cloud-storage/build.gradle b/plugins/repository-gcs/qa/google-cloud-storage/build.gradle index c2055959ad1..25e43defd86 100644 --- a/plugins/repository-gcs/qa/google-cloud-storage/build.gradle +++ b/plugins/repository-gcs/qa/google-cloud-storage/build.gradle @@ -20,7 +20,6 @@ import org.elasticsearch.gradle.MavenFilteringHack import org.elasticsearch.gradle.info.BuildParams -import org.elasticsearch.gradle.test.AntFixture import java.nio.file.Files import java.security.KeyPair @@ -30,12 +29,14 @@ import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE apply plugin: 'elasticsearch.standalone-rest-test' apply plugin: 'elasticsearch.rest-test' +apply plugin: 'elasticsearch.test.fixtures' // TODO think about flattening qa:google-cloud-storage project into parent dependencies { testCompile project(path: ':plugins:repository-gcs') } +testFixtures.useFixture(':test:fixtures:gcs-fixture') boolean useFixture = false String gcsServiceAccount = System.getenv("google_storage_service_account") @@ -45,7 +46,7 @@ String gcsBasePath = System.getenv("google_storage_base_path") File serviceAccountFile = null if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) { serviceAccountFile = new File(project.buildDir, 'generated-resources/service_account_test.json') - gcsBucket = 'bucket_test' + gcsBucket = 'bucket' gcsBasePath = 'integration_test' useFixture = true } else if (!gcsServiceAccount || !gcsBucket || !gcsBasePath) { @@ -58,12 +59,11 @@ def encodedCredentials = { Base64.encoder.encodeToString(Files.readAllBytes(serviceAccountFile.toPath())) } -/** A task to start the GoogleCloudStorageFixture which emulates a Google Cloud Storage service **/ -task googleCloudStorageFixture(type: AntFixture) { - dependsOn testClasses - env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }" - executable = "${BuildParams.runtimeJavaHome}/bin/java" - args 'org.elasticsearch.repositories.gcs.GoogleCloudStorageFixture', baseDir, 'bucket_test' +def fixtureAddress = { fixture -> + assert useFixture : 'closure should not be used without a fixture' + int ephemeralPort = project(':test:fixtures:gcs-fixture').postProcessFixture.ext."test.fixtures.${fixture}.tcp.80" + assert ephemeralPort > 0 + 'http://127.0.0.1:' + ephemeralPort } /** A service account file that points to the Google Cloud Storage service emulated by the fixture **/ @@ -87,6 +87,19 @@ task createServiceAccountFile() { } task thirdPartyTest (type: Test) { + if (useFixture) { + thirdPartyTest.dependsOn createServiceAccountFile + nonInputProperties.systemProperty 'test.google.endpoint', "${ -> fixtureAddress('gcs-fixture-third-party') }" + nonInputProperties.systemProperty 'test.google.tokenURI', "${ -> fixtureAddress('gcs-fixture-third-party') }/o/oauth2/token" + + gradle.taskGraph.whenReady { + if (it.hasTask(gcsThirdPartyTests)) { + throw new IllegalStateException("Tried to run third party tests but not all of the necessary environment variables " + + "'google_storage_service_account', 'google_storage_bucket', 'google_storage_base_path' are set.") + } + } + } + include '**/GoogleCloudStorageThirdPartyTests.class' systemProperty 'tests.security.manager', false systemProperty 'test.google.bucket', gcsBucket @@ -98,32 +111,6 @@ task gcsThirdPartyTests { dependsOn check } -if (useFixture) { - // TODO think about running the fixture in the same JVM as tests - thirdPartyTest.dependsOn createServiceAccountFile, googleCloudStorageFixture - thirdPartyTest.finalizedBy googleCloudStorageFixture.getStopTask() - - def fixtureEndpoint = { - "http://${googleCloudStorageFixture.addressAndPort}" - } - - def tokenURI = { - "http://${googleCloudStorageFixture.addressAndPort}/o/oauth2/token" - } - - thirdPartyTest { - nonInputProperties.systemProperty 'test.google.endpoint', "${ -> fixtureEndpoint.call() }" - nonInputProperties.systemProperty 'test.google.tokenURI', "${ -> tokenURI.call() }" - } - - gradle.taskGraph.whenReady { - if (it.hasTask(gcsThirdPartyTests)) { - throw new IllegalStateException("Tried to run third party tests but not all of the necessary environment variables 'google_storage_service_account', " + - "'google_storage_bucket', 'google_storage_base_path' are set.") - } - } -} - integTest.mustRunAfter(thirdPartyTest) check.dependsOn thirdPartyTest @@ -147,10 +134,10 @@ testClusters.integTest { keystore 'gcs.client.integration_test.credentials_file', serviceAccountFile, IGNORE_VALUE if (useFixture) { - tasks.integTest.dependsOn createServiceAccountFile, googleCloudStorageFixture + tasks.integTest.dependsOn createServiceAccountFile /* Use a closure on the string to delay evaluation until tests are executed */ - setting 'gcs.client.integration_test.endpoint', { "http://${googleCloudStorageFixture.addressAndPort}" }, IGNORE_VALUE - setting 'gcs.client.integration_test.token_uri', { "http://${googleCloudStorageFixture.addressAndPort}/o/oauth2/token" }, IGNORE_VALUE + setting 'gcs.client.integration_test.endpoint', { "${ -> fixtureAddress('gcs-fixture') }" }, IGNORE_VALUE + setting 'gcs.client.integration_test.token_uri', { "${ -> fixtureAddress('gcs-fixture') }/o/oauth2/token" }, IGNORE_VALUE } else { println "Using an external service to test the repository-gcs plugin" } diff --git a/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java b/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java deleted file mode 100644 index b2276d78417..00000000000 --- a/plugins/repository-gcs/qa/google-cloud-storage/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageFixture.java +++ /dev/null @@ -1,656 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.repositories.gcs; - -import org.elasticsearch.test.fixture.AbstractHttpFixture; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.path.PathTrie; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.zip.GZIPInputStream; - -import static java.util.Collections.emptyMap; -import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - -/** - * {@link GoogleCloudStorageFixture} emulates a Google Cloud Storage service. - * - * The implementation is based on official documentation available at https://cloud.google.com/storage/docs/json_api/v1/. - */ -public class GoogleCloudStorageFixture extends AbstractHttpFixture { - - /** List of the buckets stored on this test server **/ - private final Map buckets = ConcurrentCollections.newConcurrentMap(); - - /** Request handlers for the requests made by the Google Cloud Storage client **/ - private final PathTrie handlers; - - /** - * Creates a {@link GoogleCloudStorageFixture} - */ - private GoogleCloudStorageFixture(final String workingDir, final String bucket) { - super(workingDir); - this.buckets.put(bucket, new Bucket(bucket)); - this.handlers = defaultHandlers(buckets); - } - - @Override - protected Response handle(final Request request) throws IOException { - final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters()); - if (handler != null) { - return handler.handle(request); - } - return null; - } - - public static void main(final String[] args) throws Exception { - if (args == null || args.length != 2) { - throw new IllegalArgumentException("GoogleCloudStorageFixture "); - } - - final GoogleCloudStorageFixture fixture = new GoogleCloudStorageFixture(args[0], args[1]); - fixture.listen(); - } - - /** Builds the default request handlers **/ - private static PathTrie defaultHandlers(final Map buckets) { - final PathTrie handlers = new PathTrie<>(RestUtils.REST_DECODER); - - // GET Bucket - // - // https://cloud.google.com/storage/docs/json_api/v1/buckets/get - handlers.insert("GET /storage/v1/b/{bucket}", (request) -> { - final String name = request.getParam("bucket"); - if (Strings.hasText(name) == false) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "bucket name is missing"); - } - - if (buckets.containsKey(name)) { - return newResponse(RestStatus.OK, emptyMap(), buildBucketResource(name)); - } else { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - }); - - // GET Object - // - // https://cloud.google.com/storage/docs/json_api/v1/objects/get - handlers.insert("GET /storage/v1/b/{bucket}/o/{object}", (request) -> { - final String objectName = request.getParam("object"); - if (Strings.hasText(objectName) == false) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing"); - } - - final Bucket bucket = buckets.get(request.getParam("bucket")); - if (bucket == null) { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - - for (final Map.Entry object : bucket.objects.entrySet()) { - if (object.getKey().equals(objectName)) { - return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, object.getValue())); - } - } - return newError(RestStatus.NOT_FOUND, "object not found"); - }); - - // Delete Object - // - // https://cloud.google.com/storage/docs/json_api/v1/objects/delete - handlers.insert("DELETE /storage/v1/b/{bucket}/o/{object}", (request) -> { - final String objectName = request.getParam("object"); - if (Strings.hasText(objectName) == false) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing"); - } - - final Bucket bucket = buckets.get(request.getParam("bucket")); - if (bucket == null) { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - - final Item item = bucket.objects.remove(objectName); - if (item != null) { - return new Response(RestStatus.NO_CONTENT.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE); - } - return newError(RestStatus.NOT_FOUND, "object not found"); - }); - - // Insert Object (initialization) - // - // https://cloud.google.com/storage/docs/json_api/v1/objects/insert - handlers.insert("POST /upload/storage/v1/b/{bucket}/o", (request) -> { - final String ifGenerationMatch = request.getParam("ifGenerationMatch"); - final String uploadType = request.getParam("uploadType"); - if ("resumable".equals(uploadType)) { - final String objectName = request.getParam("name"); - if (Strings.hasText(objectName) == false) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing"); - } - final Bucket bucket = buckets.get(request.getParam("bucket")); - if (bucket == null) { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - if ("0".equals(ifGenerationMatch)) { - if (bucket.objects.putIfAbsent(objectName, Item.empty(objectName)) == null) { - final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id=" - + objectName; - return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder()); - } else { - return newError(RestStatus.PRECONDITION_FAILED, "object already exist"); - } - } else { - bucket.objects.put(objectName, Item.empty(objectName)); - final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id=" - + objectName; - return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder()); - } - } else if ("multipart".equals(uploadType)) { - /* - * A multipart/related request body looks like this (note the binary dump inside a text blob! nice!): - * --__END_OF_PART__ - * Content-Length: 135 - * Content-Type: application/json; charset=UTF-8 - * content-transfer-encoding: binary - * - * {"bucket":"bucket_test","crc32c":"7XacHQ==","md5Hash":"fVztGkklMlUamsSmJK7W+w==", - * "name":"tests-KEwE3bU4TuyetBgQIghmUw/master.dat-temp"} - * --__END_OF_PART__ - * content-transfer-encoding: binary - * - * KEwE3bU4TuyetBgQIghmUw - * --__END_OF_PART__-- - */ - String boundary = "__END_OF_PART__"; - // Determine the multipart boundary - final String contentType = request.getContentType(); - if ((contentType != null) && contentType.contains("multipart/related; boundary=")) { - boundary = contentType.replace("multipart/related; boundary=", ""); - } - - InputStream inputStreamBody = new ByteArrayInputStream(request.getBody()); - final String contentEncoding = request.getHeader("Content-Encoding"); - if (contentEncoding != null) { - if ("gzip".equalsIgnoreCase(contentEncoding)) { - inputStreamBody = new GZIPInputStream(inputStreamBody); - } - } - // Read line by line ?both? parts of the multipart. Decoding headers as - // IS_8859_1 is safe. - try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStreamBody, StandardCharsets.ISO_8859_1))) { - String line; - // read first part delimiter - line = reader.readLine(); - if ((line == null) || (line.equals("--" + boundary) == false)) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, - "Error parsing multipart request. Does not start with the part delimiter."); - } - final Map> firstPartHeaders = new HashMap<>(); - // Reads the first part's headers, if any - while ((line = reader.readLine()) != null) { - if (line.equals("\r\n") || (line.length() == 0)) { - // end of headers - break; - } else { - final String[] header = line.split(":", 2); - firstPartHeaders.put(header[0], singletonList(header[1])); - } - } - final List firstPartContentTypes = firstPartHeaders.getOrDefault("Content-Type", - firstPartHeaders.get("Content-type")); - if ((firstPartContentTypes == null) - || (firstPartContentTypes.stream().noneMatch(x -> x.contains("application/json")))) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, - "Error parsing multipart request. Metadata part expected to have the \"application/json\" content type."); - } - // read metadata part, a single line - line = reader.readLine(); - final byte[] metadata = line.getBytes(StandardCharsets.ISO_8859_1); - if ((firstPartContentTypes != null) && (firstPartContentTypes.stream().anyMatch((x -> x.contains("charset=utf-8"))))) { - // decode as utf-8 - line = new String(metadata, StandardCharsets.UTF_8); - } - final Matcher objectNameMatcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line); - objectNameMatcher.find(); - final String objectName = objectNameMatcher.group(1); - final Matcher bucketNameMatcher = Pattern.compile("\"bucket\":\"([^\"]*)\"").matcher(line); - bucketNameMatcher.find(); - final String bucketName = bucketNameMatcher.group(1); - // read second part delimiter - line = reader.readLine(); - if ((line == null) || (line.equals("--" + boundary) == false)) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, - "Error parsing multipart request. Second part does not start with delimiter. " - + "Is the metadata multi-line?"); - } - final Map> secondPartHeaders = new HashMap<>(); - // Reads the second part's headers, if any - while ((line = reader.readLine()) != null) { - if (line.equals("\r\n") || (line.length() == 0)) { - // end of headers - break; - } else { - final String[] header = line.split(":", 2); - secondPartHeaders.put(header[0], singletonList(header[1])); - } - } - final List secondPartTransferEncoding = secondPartHeaders.getOrDefault("Content-Transfer-Encoding", - secondPartHeaders.get("content-transfer-encoding")); - if ((secondPartTransferEncoding == null) - || (secondPartTransferEncoding.stream().noneMatch(x -> x.contains("binary")))) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, - "Error parsing multipart request. Data part expected to have the \"binary\" content transfer encoding."); - } - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - int c; - while ((c = reader.read()) != -1) { - // one char to one byte, because of the ISO_8859_1 encoding - baos.write(c); - } - final byte[] temp = baos.toByteArray(); - final byte[] trailingEnding = ("\r\n--" + boundary + "--\r\n").getBytes(StandardCharsets.ISO_8859_1); - // check trailing - for (int i = trailingEnding.length - 1; i >= 0; i--) { - if (trailingEnding[i] != temp[(temp.length - trailingEnding.length) + i]) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "Error parsing multipart request."); - } - } - final Bucket bucket = buckets.get(bucketName); - if (bucket == null) { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - final byte[] objectData = Arrays.copyOf(temp, temp.length - trailingEnding.length); - if ((objectName != null) && (bucketName != null) && (objectData != null)) { - bucket.objects.put(objectName, new Item(objectName, objectData)); - return new Response(RestStatus.OK.getStatus(), JSON_CONTENT_TYPE, metadata); - } else { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "error parsing multipart request"); - } - } - } else { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload type must be resumable or multipart"); - } - }); - - // Insert Object (upload) - // - // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload - handlers.insert("PUT /upload/storage/v1/b/{bucket}/o", (request) -> { - final String objectId = request.getParam("upload_id"); - if (Strings.hasText(objectId) == false) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload id is missing"); - } - - final Bucket bucket = buckets.get(request.getParam("bucket")); - if (bucket == null) { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - - if (bucket.objects.containsKey(objectId) == false) { - return newError(RestStatus.NOT_FOUND, "object name not found"); - } - - final Item item = new Item(objectId, request.getBody()); - - bucket.objects.put(objectId, item); - return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, item)); - }); - - // List Objects - // - // https://cloud.google.com/storage/docs/json_api/v1/objects/list - handlers.insert("GET /storage/v1/b/{bucket}/o", (request) -> { - final Bucket bucket = buckets.get(request.getParam("bucket")); - if (bucket == null) { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - - final XContentBuilder builder = jsonBuilder(); - builder.startObject(); - builder.field("kind", "storage#objects"); - final Set prefixes = new HashSet<>(); - { - builder.startArray("items"); - - final String prefixParam = request.getParam("prefix"); - final String delimiter = request.getParam("delimiter"); - - for (final Map.Entry object : bucket.objects.entrySet()) { - String objectKey = object.getKey(); - if ((prefixParam != null) && (objectKey.startsWith(prefixParam) == false)) { - continue; - } - - if (Strings.isNullOrEmpty(delimiter)) { - buildObjectResource(builder, bucket.name, object.getValue()); - } else { - int prefixLength = prefixParam.length(); - String rest = objectKey.substring(prefixLength); - int delimiterPos; - if ((delimiterPos = rest.indexOf(delimiter)) != -1) { - String key = objectKey.substring(0, prefixLength + delimiterPos + 1); - prefixes.add(key); - } else { - buildObjectResource(builder, bucket.name, object.getValue()); - } - } - } - builder.endArray(); - } - { - if (prefixes.isEmpty() == false) { - builder.array("prefixes", prefixes.toArray()); - } - } - builder.endObject(); - return newResponse(RestStatus.OK, emptyMap(), builder); - }); - - // Download Object - // - // https://cloud.google.com/storage/docs/request-body - handlers.insert("GET /download/storage/v1/b/{bucket}/o/{object}", (request) -> { - final String object = request.getParam("object"); - if (Strings.hasText(object) == false) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, "object id is missing"); - } - - final Bucket bucket = buckets.get(request.getParam("bucket")); - if (bucket == null) { - return newError(RestStatus.NOT_FOUND, "bucket not found"); - } - - if (bucket.objects.containsKey(object) == false) { - return newError(RestStatus.NOT_FOUND, "object name not found"); - } - - return new Response(RestStatus.OK.getStatus(), contentType("application/octet-stream"), bucket.objects.get(object).bytes); - }); - - // Batch - // - // https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch - handlers.insert("POST /batch/storage/v1", (request) -> { - final List batchedResponses = new ArrayList<>(); - - // A batch request body looks like this: - // - // --__END_OF_PART__ - // Content-Length: 71 - // Content-Type: application/http - // content-id: 1 - // content-transfer-encoding: binary - // - // DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/foo%2Ftest HTTP/1.1 - // - // - // --__END_OF_PART__ - // Content-Length: 71 - // Content-Type: application/http - // content-id: 2 - // content-transfer-encoding: binary - // - // DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/bar%2Ftest HTTP/1.1 - // - // - // --__END_OF_PART__-- - - // Default multipart boundary - String boundary = "__END_OF_PART__"; - - // Determine the multipart boundary - final String contentType = request.getContentType(); - if ((contentType != null) && contentType.contains("multipart/mixed; boundary=")) { - boundary = contentType.replace("multipart/mixed; boundary=", ""); - } - - long batchedRequests = 0L; - - // Read line by line the batched requests - try (BufferedReader reader = new BufferedReader( - new InputStreamReader( - new ByteArrayInputStream(request.getBody()), StandardCharsets.UTF_8))) { - String line; - while ((line = reader.readLine()) != null) { - // Start of a batched request - if (line.equals("--" + boundary)) { - final Map batchedHeaders = new HashMap<>(); - - // Reads the headers, if any - while ((line = reader.readLine()) != null) { - if (line.equals("\r\n") || (line.length() == 0)) { - // end of headers - break; - } else { - final String[] header = line.split(":", 2); - batchedHeaders.put(header[0], header[1]); - } - } - - // Reads the method and URL - line = reader.readLine(); - final String batchedMethod = line.substring(0, line.indexOf(' ')); - final URI batchedUri = URI.create(line.substring(batchedMethod.length() + 1, line.lastIndexOf(' '))); - - // Reads the body - line = reader.readLine(); - byte[] batchedBody = new byte[0]; - if ((line != null) || (line.startsWith("--" + boundary) == false)) { - batchedBody = line.getBytes(StandardCharsets.UTF_8); - } - - final Request batchedRequest = new Request(batchedRequests, batchedMethod, batchedUri, batchedHeaders, batchedBody); - batchedRequests = batchedRequests + 1; - - // Executes the batched request - final RequestHandler handler = - handlers.retrieve(batchedRequest.getMethod() + " " + batchedRequest.getPath(), batchedRequest.getParameters()); - if (handler != null) { - try { - batchedResponses.add(handler.handle(batchedRequest)); - } catch (final IOException e) { - batchedResponses.add(newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); - } - } - } - } - } - - // Now we can build the response - final String sep = "--"; - final String line = "\r\n"; - - final StringBuilder builder = new StringBuilder(); - for (final Response response : batchedResponses) { - builder.append(sep).append(boundary).append(line); - builder.append("Content-Type: application/http").append(line); - builder.append(line); - builder.append("HTTP/1.1 ") - .append(response.getStatus()) - .append(' ') - .append(RestStatus.fromCode(response.getStatus()).toString()) - .append(line); - builder.append("Content-Length: ").append(response.getBody().length).append(line); - builder.append("Content-Type: ").append(response.getContentType()).append(line); - response.getHeaders().forEach((k, v) -> builder.append(k).append(": ").append(v).append(line)); - builder.append(line); - builder.append(new String(response.getBody(), StandardCharsets.UTF_8)).append(line); - builder.append(line); - } - builder.append(line); - builder.append(sep).append(boundary).append(sep); - - final byte[] content = builder.toString().getBytes(StandardCharsets.UTF_8); - return new Response(RestStatus.OK.getStatus(), contentType("multipart/mixed; boundary=" + boundary), content); - }); - - // Fake refresh of an OAuth2 token - // - handlers.insert("POST /o/oauth2/token", (request) -> - newResponse(RestStatus.OK, emptyMap(), jsonBuilder() - .startObject() - .field("access_token", "unknown") - .field("token_type", "Bearer") - .field("expires_in", 3600) - .endObject()) - ); - - return handlers; - } - - /** - * Represents a Storage bucket as if it was created on Google Cloud Storage. - */ - static class Bucket { - - /** Bucket name **/ - final String name; - - /** Blobs contained in the bucket **/ - final Map objects; - - Bucket(final String name) { - this.name = Objects.requireNonNull(name); - this.objects = ConcurrentCollections.newConcurrentMap(); - } - } - - static class Item { - final String name; - final LocalDateTime created; - final byte[] bytes; - - Item(String name, byte[] bytes) { - this.name = name; - this.bytes = bytes; - this.created = LocalDateTime.now(ZoneOffset.UTC); - } - - public static Item empty(String name) { - return new Item(name, EMPTY_BYTE); - } - } - - /** - * Builds a JSON response - */ - private static Response newResponse(final RestStatus status, final Map headers, final XContentBuilder xContentBuilder) { - final Map responseHeaders = new HashMap<>(JSON_CONTENT_TYPE); - responseHeaders.putAll(headers); - - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - BytesReference.bytes(xContentBuilder).writeTo(out); - - return new Response(status.getStatus(), responseHeaders, out.toByteArray()); - } catch (final IOException e) { - return newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()); - } - } - - /** - * Storage Error JSON representation - */ - private static Response newError(final RestStatus status, final String message) { - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - try (XContentBuilder builder = jsonBuilder()) { - builder.startObject() - .startObject("error") - .field("code", status.getStatus()) - .field("message", message) - .startArray("errors") - .startObject() - .field("domain", "global") - .field("reason", status.toString()) - .field("message", message) - .endObject() - .endArray() - .endObject() - .endObject(); - BytesReference.bytes(builder).writeTo(out); - } - return new Response(status.getStatus(), JSON_CONTENT_TYPE, out.toByteArray()); - } catch (final IOException e) { - final byte[] bytes = (message != null ? message : "something went wrong").getBytes(StandardCharsets.UTF_8); - return new Response(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), TEXT_PLAIN_CONTENT_TYPE, bytes); - } - } - - /** - * Storage Bucket JSON representation as defined in - * https://cloud.google.com/storage/docs/json_api/v1/bucket#resource - */ - private static XContentBuilder buildBucketResource(final String name) throws IOException { - return jsonBuilder().startObject() - .field("kind", "storage#bucket") - .field("name", name) - .field("id", name) - .endObject(); - } - - /** - * Storage Object JSON representation as defined in - * https://cloud.google.com/storage/docs/json_api/v1/objects#resource - */ - private static XContentBuilder buildObjectResource(final String bucket, final Item item) throws IOException { - return buildObjectResource(jsonBuilder(), bucket, item); - } - - /** - * Storage Object JSON representation as defined in - * https://cloud.google.com/storage/docs/json_api/v1/objects#resource - */ - private static XContentBuilder buildObjectResource(final XContentBuilder builder, - final String bucket, - final Item item) throws IOException { - return builder.startObject() - .field("kind", "storage#object") - .field("id", String.join("/", bucket, item.name)) - .field("name", item.name) - .field("timeCreated", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(item.created)) - .field("bucket", bucket) - .field("size", String.valueOf(item.bytes.length)) - .endObject(); - } -} \ No newline at end of file diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 714ea968ff0..f70625dd179 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -24,6 +24,7 @@ import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpContext; import com.sun.net.httpserver.HttpServer; +import fixture.gcs.FakeOAuth2HttpHandler; import org.apache.http.HttpStatus; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; @@ -68,6 +69,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeEnd; +import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeLimit; +import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeStart; +import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody; import static java.nio.charset.StandardCharsets.UTF_8; import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; @@ -144,13 +149,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase { final List httpContexts = Arrays.asList( // Auth - httpServer.createContext("/token", exchange -> { - byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - }), + httpServer.createContext("/token", new FakeOAuth2HttpHandler()), // Does bucket exists? httpServer.createContext("/storage/v1/b/bucket", exchange -> { byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8); @@ -244,7 +243,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase { httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> { assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart")); if (countDown.countDown()) { - Optional> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody()); + Optional> content = parseMultipartRequestBody(exchange.getRequestBody()); assertThat(content.isPresent(), is(true)); assertThat(content.get().v1(), equalTo("write_blob_max_retries")); if (Objects.deepEquals(bytes, content.get().v2().array())) { @@ -387,12 +386,12 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase { final long bytesRead = Streams.copy(exchange.getRequestBody(), requestBody); assertThat(Math.toIntExact(bytesRead), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize))); - final int rangeStart = TestUtils.getContentRangeStart(range); - final int rangeEnd = TestUtils.getContentRangeEnd(range); + final int rangeStart = getContentRangeStart(range); + final int rangeEnd = getContentRangeEnd(range); assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(bytesRead))); assertArrayEquals(Arrays.copyOfRange(data, rangeStart, rangeEnd + 1), requestBody.toByteArray()); - final Integer limit = TestUtils.getContentRangeLimit(range); + final Integer limit = getContentRangeLimit(range); if (limit != null) { exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); exchange.close(); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 9d0f6fb3e73..9ff7192bda6 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -24,16 +24,10 @@ import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; -import org.apache.http.HttpStatus; -import org.apache.lucene.util.ArrayUtil; +import fixture.gcs.GoogleCloudStorageHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -43,37 +37,20 @@ import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.RestUtils; import org.elasticsearch.threadpool.ThreadPool; import org.threeten.bp.Duration; -import java.io.BufferedInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME; -import static org.hamcrest.Matchers.lessThanOrEqualTo; @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @@ -100,8 +77,8 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe @Override protected Map createHttpHandlers() { final Map handlers = new HashMap<>(2); - handlers.put("/", new InternalHttpHandler()); - handlers.put("/token", new FakeOAuth2HttpHandler()); + handlers.put("/", new GoogleCloudStorageHttpHandler("bucket")); + handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler()); return Collections.unmodifiableMap(handlers); } @@ -209,187 +186,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe } } - /** - * Minimal HTTP handler that acts as a Google Cloud Storage compliant server - */ - @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint") - private static class InternalHttpHandler implements HttpHandler { - - private final ConcurrentMap blobs = new ConcurrentHashMap<>(); - - @Override - public void handle(final HttpExchange exchange) throws IOException { - final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); - try { - if (Regex.simpleMatch("GET /storage/v1/b/bucket/o*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - final String prefix = params.get("prefix"); - - final List> listOfBlobs = blobs.entrySet().stream() - .filter(blob -> prefix == null || blob.getKey().startsWith(prefix)).collect(Collectors.toList()); - - final StringBuilder list = new StringBuilder(); - list.append("{\"kind\":\"storage#objects\",\"items\":["); - for (Iterator> it = listOfBlobs.iterator(); it.hasNext(); ) { - Map.Entry blob = it.next(); - list.append("{\"kind\":\"storage#object\","); - list.append("\"bucket\":\"bucket\","); - list.append("\"name\":\"").append(blob.getKey()).append("\","); - list.append("\"id\":\"").append(blob.getKey()).append("\","); - list.append("\"size\":\"").append(blob.getValue().length()).append("\""); - list.append('}'); - - if (it.hasNext()) { - list.append(','); - } - } - list.append("]}"); - - byte[] response = list.toString().getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - } else if (Regex.simpleMatch("GET /storage/v1/b/bucket*", request)) { - byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - - } else if (Regex.simpleMatch("GET /download/storage/v1/b/bucket/o/*", request)) { - BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/bucket/o/", "")); - if (blob != null) { - final String range = exchange.getRequestHeaders().getFirst("Range"); - Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range); - assert matcher.find(); - - byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0]; - exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - } else { - exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); - } - - } else if (Regex.simpleMatch("DELETE /storage/v1/b/bucket/o/*", request)) { - int deletions = 0; - for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry blob = iterator.next(); - if (blob.getKey().equals(exchange.getRequestURI().toString())) { - iterator.remove(); - deletions++; - } - } - exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1); - - } else if (Regex.simpleMatch("POST /batch/storage/v1", request)) { - final String uri = "/storage/v1/b/bucket/o/"; - final StringBuilder batch = new StringBuilder(); - for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) { - if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) { - batch.append(line).append('\n'); - } else if (line.startsWith("DELETE")) { - final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP")); - if (Strings.hasText(name)) { - try { - final String blobName = URLDecoder.decode(name, UTF_8.name()); - if (blobs.entrySet().removeIf(blob -> blob.getKey().equals(blobName))) { - batch.append("HTTP/1.1 204 NO_CONTENT").append('\n'); - batch.append('\n'); - } - } catch (UnsupportedEncodingException e) { - batch.append("HTTP/1.1 404 NOT_FOUND").append('\n'); - batch.append('\n'); - } - } - } - } - byte[] response = batch.toString().getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type")); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - } else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=multipart*", request)) { - Optional> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody()); - if (content.isPresent()) { - blobs.put(content.get().v1(), content.get().v2()); - - byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - } else { - exchange.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1); - } - - } else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=resumable*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - final String blobName = params.get("name"); - blobs.put(blobName, BytesArray.EMPTY); - - byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.getResponseHeaders().add("Location", httpServerUrl() + "/upload/storage/v1/b/bucket/o?" - + "uploadType=resumable" - + "&upload_id=" + UUIDs.randomBase64UUID() - + "&test_blob_name=" + blobName); // not a Google Storage parameter, but it allows to pass the blob name - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - - } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/bucket/o?*uploadType=resumable*", request)) { - final Map params = new HashMap<>(); - RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); - - final String blobName = params.get("test_blob_name"); - byte[] blob = blobs.get(blobName).array(); - assertNotNull(blob); - - final String range = exchange.getRequestHeaders().getFirst("Content-Range"); - final Integer limit = TestUtils.getContentRangeLimit(range); - final int start = TestUtils.getContentRangeStart(range); - final int end = TestUtils.getContentRangeEnd(range); - - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - long bytesRead = Streams.copy(exchange.getRequestBody(), out); - int length = Math.max(end + 1, limit != null ? limit : 0); - assertThat((int) bytesRead, lessThanOrEqualTo(length)); - if (length > blob.length) { - blob = ArrayUtil.growExact(blob, length); - } - System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead)); - blobs.put(blobName, new BytesArray(blob)); - - if (limit == null) { - exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end)); - exchange.getResponseHeaders().add("Content-Length", "0"); - exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1); - } else { - assertThat(limit, lessThanOrEqualTo(blob.length)); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); - } - } else { - exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1); - } - } finally { - exchange.close(); - } - } - } - - @SuppressForbidden(reason = "this test uses a HttpServer to emulate a fake OAuth2 authentication service") - private static class FakeOAuth2HttpHandler implements HttpHandler { - @Override - public void handle(final HttpExchange exchange) throws IOException { - byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length); - exchange.getResponseBody().write(response); - exchange.close(); - } - } - /** * HTTP handler that injects random Google Cloud Storage service errors * diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/TestUtils.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/TestUtils.java index a6ae0578fbd..993e55e93d1 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/TestUtils.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/TestUtils.java @@ -18,28 +18,14 @@ */ package org.elasticsearch.repositories.gcs; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; -import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; import java.security.KeyPairGenerator; -import java.util.Arrays; import java.util.Base64; -import java.util.Locale; -import java.util.Optional; import java.util.Random; import java.util.UUID; -import java.util.function.BiFunction; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.zip.GZIPInputStream; - -import static java.nio.charset.StandardCharsets.UTF_8; final class TestUtils { @@ -72,87 +58,4 @@ final class TestUtils { throw new AssertionError("Unable to create service account file", e); } } - - static Optional> parseMultipartRequestBody(final InputStream requestBody) throws IOException { - Tuple content = null; - try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) { - String name = null; - int read; - while ((read = in.read()) != -1) { - boolean markAndContinue = false; - try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { - do { // search next consecutive {carriage return, new line} chars and stop - if ((char) read == '\r') { - int next = in.read(); - if (next != -1) { - if (next == '\n') { - break; - } - out.write(read); - out.write(next); - continue; - } - } - out.write(read); - } while ((read = in.read()) != -1); - - final String line = new String(out.toByteArray(), UTF_8); - if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--") - || line.toLowerCase(Locale.ROOT).startsWith("content")) { - markAndContinue = true; - } else if (line.startsWith("{\"bucket\":\"bucket\"")) { - markAndContinue = true; - Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line); - if (matcher.find()) { - name = matcher.group(1); - } - } - if (markAndContinue) { - in.mark(Integer.MAX_VALUE); - continue; - } - } - if (name != null) { - in.reset(); - try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) { - while ((read = in.read()) != -1) { - binary.write(read); - } - binary.flush(); - byte[] tmp = binary.toByteArray(); - // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long - content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23))); - } - } - } - } - return Optional.ofNullable(content); - } - - private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)"); - private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("([0-9]*)-([0-9]*)"); - - private static Integer parse(final Pattern pattern, final String contentRange, final BiFunction fn) { - final Matcher matcher = pattern.matcher(contentRange); - if (matcher.matches() == false || matcher.groupCount() != 2) { - throw new IllegalArgumentException("Unable to parse content range header"); - } - return fn.apply(matcher.group(1), matcher.group(2)); - } - - static Integer getContentRangeLimit(final String contentRange) { - return parse(PATTERN_CONTENT_RANGE, contentRange, (bytes, limit) -> "*".equals(limit) ? null : Integer.parseInt(limit)); - } - - static int getContentRangeStart(final String contentRange) { - return parse(PATTERN_CONTENT_RANGE, contentRange, - (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes, - (start, end) -> Integer.parseInt(start))); - } - - static int getContentRangeEnd(final String contentRange) { - return parse(PATTERN_CONTENT_RANGE, contentRange, - (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes, - (start, end) -> Integer.parseInt(end))); - } } diff --git a/settings.gradle b/settings.gradle index e6284f506ca..13ffde84e7f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -52,10 +52,11 @@ List projects = [ 'server', 'server:cli', 'test:framework', + 'test:fixtures:azure-fixture', + 'test:fixtures:gcs-fixture', 'test:fixtures:hdfs-fixture', 'test:fixtures:krb5kdc-fixture', 'test:fixtures:old-elasticsearch', - 'test:fixtures:azure-fixture', 'test:logger-usage' ] diff --git a/test/fixtures/gcs-fixture/Dockerfile b/test/fixtures/gcs-fixture/Dockerfile new file mode 100644 index 00000000000..52e0d31888f --- /dev/null +++ b/test/fixtures/gcs-fixture/Dockerfile @@ -0,0 +1,17 @@ +FROM ubuntu:19.04 + +RUN apt-get update -qqy +RUN apt-get install -qqy openjdk-12-jre-headless + +ARG port +ARG bucket +ARG token + +ENV GCS_FIXTURE_PORT=${port} +ENV GCS_FIXTURE_BUCKET=${bucket} +ENV GCS_FIXTURE_TOKEN=${token} + +ENTRYPOINT exec java -classpath "/fixture/shared/*" \ + fixture.gcs.GoogleCloudStorageHttpFixture 0.0.0.0 "$GCS_FIXTURE_PORT" "$GCS_FIXTURE_BUCKET" "$GCS_FIXTURE_TOKEN" + +EXPOSE $port \ No newline at end of file diff --git a/test/fixtures/gcs-fixture/build.gradle b/test/fixtures/gcs-fixture/build.gradle new file mode 100644 index 00000000000..9673d54f900 --- /dev/null +++ b/test/fixtures/gcs-fixture/build.gradle @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +apply plugin: 'elasticsearch.build' +apply plugin: 'elasticsearch.test.fixtures' + +description = 'Fixture for Google Cloud Storage service' +test.enabled = false + +dependencies { + compile project(':server') +} + +preProcessFixture { + dependsOn jar + doLast { + file("${testFixturesDir}/shared").mkdirs() + project.copy { + from jar + from configurations.runtimeClasspath + into "${testFixturesDir}/shared" + } + } +} \ No newline at end of file diff --git a/test/fixtures/gcs-fixture/docker-compose.yml b/test/fixtures/gcs-fixture/docker-compose.yml new file mode 100644 index 00000000000..ce4be2c6bfe --- /dev/null +++ b/test/fixtures/gcs-fixture/docker-compose.yml @@ -0,0 +1,26 @@ +version: '3' +services: + gcs-fixture: + build: + context: . + args: + port: 80 + bucket: "bucket" + token: "o/oauth2/token" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" + gcs-fixture-third-party: + build: + context: . + args: + port: 80 + bucket: "bucket" + token: "o/oauth2/token" + dockerfile: Dockerfile + volumes: + - ./testfixtures_shared/shared:/fixture/shared + ports: + - "80" \ No newline at end of file diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java new file mode 100644 index 00000000000..bb62f7692d1 --- /dev/null +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.gcs; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +import static java.nio.charset.StandardCharsets.UTF_8; + +@SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service") +public class FakeOAuth2HttpHandler implements HttpHandler { + + @Override + public void handle(final HttpExchange exchange) throws IOException { + byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + exchange.close(); + } +} diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpFixture.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpFixture.java new file mode 100644 index 00000000000..e9f331c94f3 --- /dev/null +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpFixture.java @@ -0,0 +1,55 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.gcs; + +import com.sun.net.httpserver.HttpServer; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +public class GoogleCloudStorageHttpFixture { + + private final HttpServer server; + + private GoogleCloudStorageHttpFixture(final String address, final int port, + final String bucket, final String token) throws IOException { + this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0); + server.createContext("/" + token, new FakeOAuth2HttpHandler()); + server.createContext("/", new GoogleCloudStorageHttpHandler(bucket)); + } + + private void start() throws Exception { + try { + server.start(); + // wait to be killed + Thread.sleep(Long.MAX_VALUE); + } finally { + server.stop(0); + } + } + + public static void main(final String[] args) throws Exception { + if (args == null || args.length != 4) { + throw new IllegalArgumentException("GoogleCloudStorageHttpFixture expects 4 arguments [address, port, bucket, token]"); + } + GoogleCloudStorageHttpFixture fixture = new GoogleCloudStorageHttpFixture(args[0], Integer.parseInt(args[1]), args[2], args[3]); + fixture.start(); + } +} diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java new file mode 100644 index 00000000000..dae02548e37 --- /dev/null +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -0,0 +1,340 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package fixture.gcs; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import org.apache.lucene.util.ArrayUtil; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.common.regex.Regex; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.RestUtils; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.URLDecoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.zip.GZIPInputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Minimal HTTP handler that acts as a Google Cloud Storage compliant server + */ +@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") +public class GoogleCloudStorageHttpHandler implements HttpHandler { + + private final ConcurrentMap blobs; + private final String bucket; + + public GoogleCloudStorageHttpHandler(final String bucket) { + this.bucket = Objects.requireNonNull(bucket); + this.blobs = new ConcurrentHashMap<>(); + } + + @Override + public void handle(final HttpExchange exchange) throws IOException { + final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); + try { + if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) { + // List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + final String prefix = params.getOrDefault("prefix", ""); + final String delimiter = params.get("delimiter"); + + final Set prefixes = new HashSet<>(); + final List listOfBlobs = new ArrayList<>(); + + for (final Map.Entry blob : blobs.entrySet()) { + final String blobName = blob.getKey(); + if (prefix.isEmpty() || blobName.startsWith(prefix)) { + int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1; + if (delimiterPos > -1) { + prefixes.add("\"" + blobName.substring(0, prefix.length() + delimiterPos + 1) + "\""); + } else { + listOfBlobs.add("{\"kind\":\"storage#object\"," + + "\"bucket\":\"" + bucket + "\"," + + "\"name\":\"" + blobName + "\"," + + "\"id\":\"" + blobName + "\"," + + "\"size\":\"" + blob.getValue().length() + "\"" + + "}"); + } + } + } + + byte[] response = ("{\"kind\":\"storage#objects\",\"items\":[" + + String.join(",", listOfBlobs) + + "],\"prefixes\":[" + + String.join(",", prefixes) + + "]}").getBytes(UTF_8); + + exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "*", request)) { + // GET Bucket https://cloud.google.com/storage/docs/json_api/v1/buckets/get + byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\""+ bucket + "\",\"id\":\"0\"}").getBytes(UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) { + // Download Object https://cloud.google.com/storage/docs/request-body + BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); + if (blob != null) { + final String range = exchange.getRequestHeaders().getFirst("Range"); + Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range); + if (matcher.find() == false) { + throw new AssertionError("Range bytes header does not match expected format: " + range); + } + + byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0]; + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + } else { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + } + + } else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) { + // Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete + int deletions = 0; + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); + if (blob.getKey().equals(exchange.getRequestURI().toString())) { + iterator.remove(); + deletions++; + } + } + exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1); + + } else if (Regex.simpleMatch("POST /batch/storage/v1", request)) { + // Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch + final String uri = "/storage/v1/b/" + bucket + "/o/"; + final StringBuilder batch = new StringBuilder(); + for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) { + if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) { + batch.append(line).append('\n'); + } else if (line.startsWith("DELETE")) { + final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP")); + if (Strings.hasText(name)) { + String blobName = URLDecoder.decode(name, UTF_8.name()); + if (blobs.entrySet().removeIf(blob -> blob.getKey().equals(blobName))) { + batch.append("HTTP/1.1 204 NO_CONTENT").append('\n'); + batch.append('\n'); + } + } + } + } + byte[] response = batch.toString().getBytes(UTF_8); + exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type")); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) { + // Multipart upload + Optional> content = parseMultipartRequestBody(exchange.getRequestBody()); + if (content.isPresent()) { + blobs.put(content.get().v1(), content.get().v2()); + + byte[] response = ("{\"bucket\":\"" + bucket + "\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + } else { + exchange.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1); + } + + } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) { + // Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + final String blobName = params.get("name"); + blobs.put(blobName, BytesArray.EMPTY); + + byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json"); + exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?" + + "uploadType=resumable" + + "&upload_id=" + UUIDs.randomBase64UUID() + + "&test_blob_name=" + blobName); // not a Google Storage parameter, but it allows to pass the blob name + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + + } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/" + bucket + "/o?*uploadType=resumable*", request)) { + // Resumable upload https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + + final String blobName = params.get("test_blob_name"); + byte[] blob = blobs.get(blobName).array(); + if (blob == null) { + exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); + return; + } + + final String range = exchange.getRequestHeaders().getFirst("Content-Range"); + final Integer limit = getContentRangeLimit(range); + final int start = getContentRangeStart(range); + final int end = getContentRangeEnd(range); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + long bytesRead = Streams.copy(exchange.getRequestBody(), out); + int length = Math.max(end + 1, limit != null ? limit : 0); + if ((int) bytesRead > length) { + throw new AssertionError("Requesting more bytes than available for blob"); + } + if (length > blob.length) { + blob = ArrayUtil.growExact(blob, length); + } + System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead)); + blobs.put(blobName, new BytesArray(blob)); + + if (limit == null) { + exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end)); + exchange.getResponseHeaders().add("Content-Length", "0"); + exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1); + } else { + if (limit > blob.length) { + throw new AssertionError("Requesting more bytes than available for blob"); + } + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); + } + } else { + exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1); + } + } finally { + exchange.close(); + } + } + + private String httpServerUrl(final HttpExchange exchange) { + final InetSocketAddress address = exchange.getLocalAddress(); + return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); + } + + public static Optional> parseMultipartRequestBody(final InputStream requestBody) throws IOException { + Tuple content = null; + try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) { + String name = null; + int read; + while ((read = in.read()) != -1) { + boolean markAndContinue = false; + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + do { // search next consecutive {carriage return, new line} chars and stop + if ((char) read == '\r') { + int next = in.read(); + if (next != -1) { + if (next == '\n') { + break; + } + out.write(read); + out.write(next); + continue; + } + } + out.write(read); + } while ((read = in.read()) != -1); + + final String line = new String(out.toByteArray(), UTF_8); + if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--") + || line.toLowerCase(Locale.ROOT).startsWith("content")) { + markAndContinue = true; + } else if (line.startsWith("{\"bucket\":")) { + markAndContinue = true; + Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line); + if (matcher.find()) { + name = matcher.group(1); + } + } + if (markAndContinue) { + in.mark(Integer.MAX_VALUE); + continue; + } + } + if (name != null) { + in.reset(); + try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) { + while ((read = in.read()) != -1) { + binary.write(read); + } + binary.flush(); + byte[] tmp = binary.toByteArray(); + // removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long + content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23))); + } + } + } + } + return Optional.ofNullable(content); + } + + private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)"); + private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("([0-9]*)-([0-9]*)"); + + private static Integer parse(final Pattern pattern, final String contentRange, final BiFunction fn) { + final Matcher matcher = pattern.matcher(contentRange); + if (matcher.matches() == false || matcher.groupCount() != 2) { + throw new IllegalArgumentException("Unable to parse content range header"); + } + return fn.apply(matcher.group(1), matcher.group(2)); + } + + public static Integer getContentRangeLimit(final String contentRange) { + return parse(PATTERN_CONTENT_RANGE, contentRange, (bytes, limit) -> "*".equals(limit) ? null : Integer.parseInt(limit)); + } + + public static int getContentRangeStart(final String contentRange) { + return parse(PATTERN_CONTENT_RANGE, contentRange, + (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes, + (start, end) -> Integer.parseInt(start))); + } + + public static int getContentRangeEnd(final String contentRange) { + return parse(PATTERN_CONTENT_RANGE, contentRange, + (bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes, + (start, end) -> Integer.parseInt(end))); + } +}