Add docker-composed based test fixture for GCS (#48902)

Similarly to what has be done for Azure in #48636, this commit
adds a new :test:fixtures:gcs-fixture project which provides two
docker-compose based fixtures that emulate a Google Cloud
Storage service.

Some code has been extracted from existing tests and placed
into this new project so that it can be easily reused in other
projects.
This commit is contained in:
Tanguy Leroux 2019-11-07 13:27:22 -05:00 committed by GitHub
parent 293902c6a5
commit 8a14ea5567
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 559 additions and 1009 deletions

View File

@ -54,6 +54,8 @@ dependencies {
compile 'io.opencensus:opencensus-api:0.18.0'
compile 'io.opencensus:opencensus-contrib-http-util:0.18.0'
compile 'com.google.apis:google-api-services-storage:v1-rev20190426-1.28.0'
testCompile project(':test:fixtures:gcs-fixture')
}
dependencyLicenses {

View File

@ -20,7 +20,6 @@
import org.elasticsearch.gradle.MavenFilteringHack
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.test.AntFixture
import java.nio.file.Files
import java.security.KeyPair
@ -30,12 +29,14 @@ import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'elasticsearch.test.fixtures'
// TODO think about flattening qa:google-cloud-storage project into parent
dependencies {
testCompile project(path: ':plugins:repository-gcs')
}
testFixtures.useFixture(':test:fixtures:gcs-fixture')
boolean useFixture = false
String gcsServiceAccount = System.getenv("google_storage_service_account")
@ -45,7 +46,7 @@ String gcsBasePath = System.getenv("google_storage_base_path")
File serviceAccountFile = null
if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) {
serviceAccountFile = new File(project.buildDir, 'generated-resources/service_account_test.json')
gcsBucket = 'bucket_test'
gcsBucket = 'bucket'
gcsBasePath = 'integration_test'
useFixture = true
} else if (!gcsServiceAccount || !gcsBucket || !gcsBasePath) {
@ -58,12 +59,11 @@ def encodedCredentials = {
Base64.encoder.encodeToString(Files.readAllBytes(serviceAccountFile.toPath()))
}
/** A task to start the GoogleCloudStorageFixture which emulates a Google Cloud Storage service **/
task googleCloudStorageFixture(type: AntFixture) {
dependsOn testClasses
env 'CLASSPATH', "${ -> project.sourceSets.test.runtimeClasspath.asPath }"
executable = "${BuildParams.runtimeJavaHome}/bin/java"
args 'org.elasticsearch.repositories.gcs.GoogleCloudStorageFixture', baseDir, 'bucket_test'
def fixtureAddress = { fixture ->
assert useFixture : 'closure should not be used without a fixture'
int ephemeralPort = project(':test:fixtures:gcs-fixture').postProcessFixture.ext."test.fixtures.${fixture}.tcp.80"
assert ephemeralPort > 0
'http://127.0.0.1:' + ephemeralPort
}
/** A service account file that points to the Google Cloud Storage service emulated by the fixture **/
@ -87,6 +87,19 @@ task createServiceAccountFile() {
}
task thirdPartyTest (type: Test) {
if (useFixture) {
thirdPartyTest.dependsOn createServiceAccountFile
nonInputProperties.systemProperty 'test.google.endpoint', "${ -> fixtureAddress('gcs-fixture-third-party') }"
nonInputProperties.systemProperty 'test.google.tokenURI', "${ -> fixtureAddress('gcs-fixture-third-party') }/o/oauth2/token"
gradle.taskGraph.whenReady {
if (it.hasTask(gcsThirdPartyTests)) {
throw new IllegalStateException("Tried to run third party tests but not all of the necessary environment variables " +
"'google_storage_service_account', 'google_storage_bucket', 'google_storage_base_path' are set.")
}
}
}
include '**/GoogleCloudStorageThirdPartyTests.class'
systemProperty 'tests.security.manager', false
systemProperty 'test.google.bucket', gcsBucket
@ -98,32 +111,6 @@ task gcsThirdPartyTests {
dependsOn check
}
if (useFixture) {
// TODO think about running the fixture in the same JVM as tests
thirdPartyTest.dependsOn createServiceAccountFile, googleCloudStorageFixture
thirdPartyTest.finalizedBy googleCloudStorageFixture.getStopTask()
def fixtureEndpoint = {
"http://${googleCloudStorageFixture.addressAndPort}"
}
def tokenURI = {
"http://${googleCloudStorageFixture.addressAndPort}/o/oauth2/token"
}
thirdPartyTest {
nonInputProperties.systemProperty 'test.google.endpoint', "${ -> fixtureEndpoint.call() }"
nonInputProperties.systemProperty 'test.google.tokenURI', "${ -> tokenURI.call() }"
}
gradle.taskGraph.whenReady {
if (it.hasTask(gcsThirdPartyTests)) {
throw new IllegalStateException("Tried to run third party tests but not all of the necessary environment variables 'google_storage_service_account', " +
"'google_storage_bucket', 'google_storage_base_path' are set.")
}
}
}
integTest.mustRunAfter(thirdPartyTest)
check.dependsOn thirdPartyTest
@ -147,10 +134,10 @@ testClusters.integTest {
keystore 'gcs.client.integration_test.credentials_file', serviceAccountFile, IGNORE_VALUE
if (useFixture) {
tasks.integTest.dependsOn createServiceAccountFile, googleCloudStorageFixture
tasks.integTest.dependsOn createServiceAccountFile
/* Use a closure on the string to delay evaluation until tests are executed */
setting 'gcs.client.integration_test.endpoint', { "http://${googleCloudStorageFixture.addressAndPort}" }, IGNORE_VALUE
setting 'gcs.client.integration_test.token_uri', { "http://${googleCloudStorageFixture.addressAndPort}/o/oauth2/token" }, IGNORE_VALUE
setting 'gcs.client.integration_test.endpoint', { "${ -> fixtureAddress('gcs-fixture') }" }, IGNORE_VALUE
setting 'gcs.client.integration_test.token_uri', { "${ -> fixtureAddress('gcs-fixture') }/o/oauth2/token" }, IGNORE_VALUE
} else {
println "Using an external service to test the repository-gcs plugin"
}

View File

@ -1,656 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.gcs;
import org.elasticsearch.test.fixture.AbstractHttpFixture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
/**
* {@link GoogleCloudStorageFixture} emulates a Google Cloud Storage service.
*
* The implementation is based on official documentation available at https://cloud.google.com/storage/docs/json_api/v1/.
*/
public class GoogleCloudStorageFixture extends AbstractHttpFixture {
/** List of the buckets stored on this test server **/
private final Map<String, Bucket> buckets = ConcurrentCollections.newConcurrentMap();
/** Request handlers for the requests made by the Google Cloud Storage client **/
private final PathTrie<RequestHandler> handlers;
/**
* Creates a {@link GoogleCloudStorageFixture}
*/
private GoogleCloudStorageFixture(final String workingDir, final String bucket) {
super(workingDir);
this.buckets.put(bucket, new Bucket(bucket));
this.handlers = defaultHandlers(buckets);
}
@Override
protected Response handle(final Request request) throws IOException {
final RequestHandler handler = handlers.retrieve(request.getMethod() + " " + request.getPath(), request.getParameters());
if (handler != null) {
return handler.handle(request);
}
return null;
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 2) {
throw new IllegalArgumentException("GoogleCloudStorageFixture <working directory> <bucket>");
}
final GoogleCloudStorageFixture fixture = new GoogleCloudStorageFixture(args[0], args[1]);
fixture.listen();
}
/** Builds the default request handlers **/
private static PathTrie<RequestHandler> defaultHandlers(final Map<String, Bucket> buckets) {
final PathTrie<RequestHandler> handlers = new PathTrie<>(RestUtils.REST_DECODER);
// GET Bucket
//
// https://cloud.google.com/storage/docs/json_api/v1/buckets/get
handlers.insert("GET /storage/v1/b/{bucket}", (request) -> {
final String name = request.getParam("bucket");
if (Strings.hasText(name) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "bucket name is missing");
}
if (buckets.containsKey(name)) {
return newResponse(RestStatus.OK, emptyMap(), buildBucketResource(name));
} else {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
});
// GET Object
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/get
handlers.insert("GET /storage/v1/b/{bucket}/o/{object}", (request) -> {
final String objectName = request.getParam("object");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
for (final Map.Entry<String, Item> object : bucket.objects.entrySet()) {
if (object.getKey().equals(objectName)) {
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, object.getValue()));
}
}
return newError(RestStatus.NOT_FOUND, "object not found");
});
// Delete Object
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/delete
handlers.insert("DELETE /storage/v1/b/{bucket}/o/{object}", (request) -> {
final String objectName = request.getParam("object");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final Item item = bucket.objects.remove(objectName);
if (item != null) {
return new Response(RestStatus.NO_CONTENT.getStatus(), TEXT_PLAIN_CONTENT_TYPE, EMPTY_BYTE);
}
return newError(RestStatus.NOT_FOUND, "object not found");
});
// Insert Object (initialization)
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/insert
handlers.insert("POST /upload/storage/v1/b/{bucket}/o", (request) -> {
final String ifGenerationMatch = request.getParam("ifGenerationMatch");
final String uploadType = request.getParam("uploadType");
if ("resumable".equals(uploadType)) {
final String objectName = request.getParam("name");
if (Strings.hasText(objectName) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object name is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if ("0".equals(ifGenerationMatch)) {
if (bucket.objects.putIfAbsent(objectName, Item.empty(objectName)) == null) {
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
} else {
return newError(RestStatus.PRECONDITION_FAILED, "object already exist");
}
} else {
bucket.objects.put(objectName, Item.empty(objectName));
final String location = /*endpoint +*/ "/upload/storage/v1/b/" + bucket.name + "/o?uploadType=resumable&upload_id="
+ objectName;
return newResponse(RestStatus.CREATED, singletonMap("Location", location), jsonBuilder());
}
} else if ("multipart".equals(uploadType)) {
/*
* A multipart/related request body looks like this (note the binary dump inside a text blob! nice!):
* --__END_OF_PART__
* Content-Length: 135
* Content-Type: application/json; charset=UTF-8
* content-transfer-encoding: binary
*
* {"bucket":"bucket_test","crc32c":"7XacHQ==","md5Hash":"fVztGkklMlUamsSmJK7W+w==",
* "name":"tests-KEwE3bU4TuyetBgQIghmUw/master.dat-temp"}
* --__END_OF_PART__
* content-transfer-encoding: binary
*
* KEwE3bU4TuyetBgQIghmUw
* --__END_OF_PART__--
*/
String boundary = "__END_OF_PART__";
// Determine the multipart boundary
final String contentType = request.getContentType();
if ((contentType != null) && contentType.contains("multipart/related; boundary=")) {
boundary = contentType.replace("multipart/related; boundary=", "");
}
InputStream inputStreamBody = new ByteArrayInputStream(request.getBody());
final String contentEncoding = request.getHeader("Content-Encoding");
if (contentEncoding != null) {
if ("gzip".equalsIgnoreCase(contentEncoding)) {
inputStreamBody = new GZIPInputStream(inputStreamBody);
}
}
// Read line by line ?both? parts of the multipart. Decoding headers as
// IS_8859_1 is safe.
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStreamBody, StandardCharsets.ISO_8859_1))) {
String line;
// read first part delimiter
line = reader.readLine();
if ((line == null) || (line.equals("--" + boundary) == false)) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Does not start with the part delimiter.");
}
final Map<String, List<String>> firstPartHeaders = new HashMap<>();
// Reads the first part's headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
firstPartHeaders.put(header[0], singletonList(header[1]));
}
}
final List<String> firstPartContentTypes = firstPartHeaders.getOrDefault("Content-Type",
firstPartHeaders.get("Content-type"));
if ((firstPartContentTypes == null)
|| (firstPartContentTypes.stream().noneMatch(x -> x.contains("application/json")))) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Metadata part expected to have the \"application/json\" content type.");
}
// read metadata part, a single line
line = reader.readLine();
final byte[] metadata = line.getBytes(StandardCharsets.ISO_8859_1);
if ((firstPartContentTypes != null) && (firstPartContentTypes.stream().anyMatch((x -> x.contains("charset=utf-8"))))) {
// decode as utf-8
line = new String(metadata, StandardCharsets.UTF_8);
}
final Matcher objectNameMatcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
objectNameMatcher.find();
final String objectName = objectNameMatcher.group(1);
final Matcher bucketNameMatcher = Pattern.compile("\"bucket\":\"([^\"]*)\"").matcher(line);
bucketNameMatcher.find();
final String bucketName = bucketNameMatcher.group(1);
// read second part delimiter
line = reader.readLine();
if ((line == null) || (line.equals("--" + boundary) == false)) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Second part does not start with delimiter. "
+ "Is the metadata multi-line?");
}
final Map<String, List<String>> secondPartHeaders = new HashMap<>();
// Reads the second part's headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
secondPartHeaders.put(header[0], singletonList(header[1]));
}
}
final List<String> secondPartTransferEncoding = secondPartHeaders.getOrDefault("Content-Transfer-Encoding",
secondPartHeaders.get("content-transfer-encoding"));
if ((secondPartTransferEncoding == null)
|| (secondPartTransferEncoding.stream().noneMatch(x -> x.contains("binary")))) {
return newError(RestStatus.INTERNAL_SERVER_ERROR,
"Error parsing multipart request. Data part expected to have the \"binary\" content transfer encoding.");
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
int c;
while ((c = reader.read()) != -1) {
// one char to one byte, because of the ISO_8859_1 encoding
baos.write(c);
}
final byte[] temp = baos.toByteArray();
final byte[] trailingEnding = ("\r\n--" + boundary + "--\r\n").getBytes(StandardCharsets.ISO_8859_1);
// check trailing
for (int i = trailingEnding.length - 1; i >= 0; i--) {
if (trailingEnding[i] != temp[(temp.length - trailingEnding.length) + i]) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "Error parsing multipart request.");
}
}
final Bucket bucket = buckets.get(bucketName);
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final byte[] objectData = Arrays.copyOf(temp, temp.length - trailingEnding.length);
if ((objectName != null) && (bucketName != null) && (objectData != null)) {
bucket.objects.put(objectName, new Item(objectName, objectData));
return new Response(RestStatus.OK.getStatus(), JSON_CONTENT_TYPE, metadata);
} else {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "error parsing multipart request");
}
}
} else {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload type must be resumable or multipart");
}
});
// Insert Object (upload)
//
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
handlers.insert("PUT /upload/storage/v1/b/{bucket}/o", (request) -> {
final String objectId = request.getParam("upload_id");
if (Strings.hasText(objectId) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "upload id is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.containsKey(objectId) == false) {
return newError(RestStatus.NOT_FOUND, "object name not found");
}
final Item item = new Item(objectId, request.getBody());
bucket.objects.put(objectId, item);
return newResponse(RestStatus.OK, emptyMap(), buildObjectResource(bucket.name, item));
});
// List Objects
//
// https://cloud.google.com/storage/docs/json_api/v1/objects/list
handlers.insert("GET /storage/v1/b/{bucket}/o", (request) -> {
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
final XContentBuilder builder = jsonBuilder();
builder.startObject();
builder.field("kind", "storage#objects");
final Set<String> prefixes = new HashSet<>();
{
builder.startArray("items");
final String prefixParam = request.getParam("prefix");
final String delimiter = request.getParam("delimiter");
for (final Map.Entry<String, Item> object : bucket.objects.entrySet()) {
String objectKey = object.getKey();
if ((prefixParam != null) && (objectKey.startsWith(prefixParam) == false)) {
continue;
}
if (Strings.isNullOrEmpty(delimiter)) {
buildObjectResource(builder, bucket.name, object.getValue());
} else {
int prefixLength = prefixParam.length();
String rest = objectKey.substring(prefixLength);
int delimiterPos;
if ((delimiterPos = rest.indexOf(delimiter)) != -1) {
String key = objectKey.substring(0, prefixLength + delimiterPos + 1);
prefixes.add(key);
} else {
buildObjectResource(builder, bucket.name, object.getValue());
}
}
}
builder.endArray();
}
{
if (prefixes.isEmpty() == false) {
builder.array("prefixes", prefixes.toArray());
}
}
builder.endObject();
return newResponse(RestStatus.OK, emptyMap(), builder);
});
// Download Object
//
// https://cloud.google.com/storage/docs/request-body
handlers.insert("GET /download/storage/v1/b/{bucket}/o/{object}", (request) -> {
final String object = request.getParam("object");
if (Strings.hasText(object) == false) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, "object id is missing");
}
final Bucket bucket = buckets.get(request.getParam("bucket"));
if (bucket == null) {
return newError(RestStatus.NOT_FOUND, "bucket not found");
}
if (bucket.objects.containsKey(object) == false) {
return newError(RestStatus.NOT_FOUND, "object name not found");
}
return new Response(RestStatus.OK.getStatus(), contentType("application/octet-stream"), bucket.objects.get(object).bytes);
});
// Batch
//
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
handlers.insert("POST /batch/storage/v1", (request) -> {
final List<Response> batchedResponses = new ArrayList<>();
// A batch request body looks like this:
//
// --__END_OF_PART__
// Content-Length: 71
// Content-Type: application/http
// content-id: 1
// content-transfer-encoding: binary
//
// DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/foo%2Ftest HTTP/1.1
//
//
// --__END_OF_PART__
// Content-Length: 71
// Content-Type: application/http
// content-id: 2
// content-transfer-encoding: binary
//
// DELETE https://www.googleapis.com/storage/v1/b/ohifkgu/o/bar%2Ftest HTTP/1.1
//
//
// --__END_OF_PART__--
// Default multipart boundary
String boundary = "__END_OF_PART__";
// Determine the multipart boundary
final String contentType = request.getContentType();
if ((contentType != null) && contentType.contains("multipart/mixed; boundary=")) {
boundary = contentType.replace("multipart/mixed; boundary=", "");
}
long batchedRequests = 0L;
// Read line by line the batched requests
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(
new ByteArrayInputStream(request.getBody()), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
// Start of a batched request
if (line.equals("--" + boundary)) {
final Map<String, String> batchedHeaders = new HashMap<>();
// Reads the headers, if any
while ((line = reader.readLine()) != null) {
if (line.equals("\r\n") || (line.length() == 0)) {
// end of headers
break;
} else {
final String[] header = line.split(":", 2);
batchedHeaders.put(header[0], header[1]);
}
}
// Reads the method and URL
line = reader.readLine();
final String batchedMethod = line.substring(0, line.indexOf(' '));
final URI batchedUri = URI.create(line.substring(batchedMethod.length() + 1, line.lastIndexOf(' ')));
// Reads the body
line = reader.readLine();
byte[] batchedBody = new byte[0];
if ((line != null) || (line.startsWith("--" + boundary) == false)) {
batchedBody = line.getBytes(StandardCharsets.UTF_8);
}
final Request batchedRequest = new Request(batchedRequests, batchedMethod, batchedUri, batchedHeaders, batchedBody);
batchedRequests = batchedRequests + 1;
// Executes the batched request
final RequestHandler handler =
handlers.retrieve(batchedRequest.getMethod() + " " + batchedRequest.getPath(), batchedRequest.getParameters());
if (handler != null) {
try {
batchedResponses.add(handler.handle(batchedRequest));
} catch (final IOException e) {
batchedResponses.add(newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
}
}
}
}
// Now we can build the response
final String sep = "--";
final String line = "\r\n";
final StringBuilder builder = new StringBuilder();
for (final Response response : batchedResponses) {
builder.append(sep).append(boundary).append(line);
builder.append("Content-Type: application/http").append(line);
builder.append(line);
builder.append("HTTP/1.1 ")
.append(response.getStatus())
.append(' ')
.append(RestStatus.fromCode(response.getStatus()).toString())
.append(line);
builder.append("Content-Length: ").append(response.getBody().length).append(line);
builder.append("Content-Type: ").append(response.getContentType()).append(line);
response.getHeaders().forEach((k, v) -> builder.append(k).append(": ").append(v).append(line));
builder.append(line);
builder.append(new String(response.getBody(), StandardCharsets.UTF_8)).append(line);
builder.append(line);
}
builder.append(line);
builder.append(sep).append(boundary).append(sep);
final byte[] content = builder.toString().getBytes(StandardCharsets.UTF_8);
return new Response(RestStatus.OK.getStatus(), contentType("multipart/mixed; boundary=" + boundary), content);
});
// Fake refresh of an OAuth2 token
//
handlers.insert("POST /o/oauth2/token", (request) ->
newResponse(RestStatus.OK, emptyMap(), jsonBuilder()
.startObject()
.field("access_token", "unknown")
.field("token_type", "Bearer")
.field("expires_in", 3600)
.endObject())
);
return handlers;
}
/**
* Represents a Storage bucket as if it was created on Google Cloud Storage.
*/
static class Bucket {
/** Bucket name **/
final String name;
/** Blobs contained in the bucket **/
final Map<String, Item> objects;
Bucket(final String name) {
this.name = Objects.requireNonNull(name);
this.objects = ConcurrentCollections.newConcurrentMap();
}
}
static class Item {
final String name;
final LocalDateTime created;
final byte[] bytes;
Item(String name, byte[] bytes) {
this.name = name;
this.bytes = bytes;
this.created = LocalDateTime.now(ZoneOffset.UTC);
}
public static Item empty(String name) {
return new Item(name, EMPTY_BYTE);
}
}
/**
* Builds a JSON response
*/
private static Response newResponse(final RestStatus status, final Map<String, String> headers, final XContentBuilder xContentBuilder) {
final Map<String, String> responseHeaders = new HashMap<>(JSON_CONTENT_TYPE);
responseHeaders.putAll(headers);
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BytesReference.bytes(xContentBuilder).writeTo(out);
return new Response(status.getStatus(), responseHeaders, out.toByteArray());
} catch (final IOException e) {
return newError(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
}
/**
* Storage Error JSON representation
*/
private static Response newError(final RestStatus status, final String message) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (XContentBuilder builder = jsonBuilder()) {
builder.startObject()
.startObject("error")
.field("code", status.getStatus())
.field("message", message)
.startArray("errors")
.startObject()
.field("domain", "global")
.field("reason", status.toString())
.field("message", message)
.endObject()
.endArray()
.endObject()
.endObject();
BytesReference.bytes(builder).writeTo(out);
}
return new Response(status.getStatus(), JSON_CONTENT_TYPE, out.toByteArray());
} catch (final IOException e) {
final byte[] bytes = (message != null ? message : "something went wrong").getBytes(StandardCharsets.UTF_8);
return new Response(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), TEXT_PLAIN_CONTENT_TYPE, bytes);
}
}
/**
* Storage Bucket JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/bucket#resource
*/
private static XContentBuilder buildBucketResource(final String name) throws IOException {
return jsonBuilder().startObject()
.field("kind", "storage#bucket")
.field("name", name)
.field("id", name)
.endObject();
}
/**
* Storage Object JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/objects#resource
*/
private static XContentBuilder buildObjectResource(final String bucket, final Item item) throws IOException {
return buildObjectResource(jsonBuilder(), bucket, item);
}
/**
* Storage Object JSON representation as defined in
* https://cloud.google.com/storage/docs/json_api/v1/objects#resource
*/
private static XContentBuilder buildObjectResource(final XContentBuilder builder,
final String bucket,
final Item item) throws IOException {
return builder.startObject()
.field("kind", "storage#object")
.field("id", String.join("/", bucket, item.name))
.field("name", item.name)
.field("timeCreated", DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(item.created))
.field("bucket", bucket)
.field("size", String.valueOf(item.bytes.length))
.endObject();
}
}

View File

@ -24,6 +24,7 @@ import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpContext;
import com.sun.net.httpserver.HttpServer;
import fixture.gcs.FakeOAuth2HttpHandler;
import org.apache.http.HttpStatus;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -68,6 +69,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeEnd;
import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeLimit;
import static fixture.gcs.GoogleCloudStorageHttpHandler.getContentRangeStart;
import static fixture.gcs.GoogleCloudStorageHttpHandler.parseMultipartRequestBody;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
@ -144,13 +149,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
final List<HttpContext> httpContexts = Arrays.asList(
// Auth
httpServer.createContext("/token", exchange -> {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
exchange.getResponseBody().write(response);
exchange.close();
}),
httpServer.createContext("/token", new FakeOAuth2HttpHandler()),
// Does bucket exists?
httpServer.createContext("/storage/v1/b/bucket", exchange -> {
byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8);
@ -244,7 +243,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
if (countDown.countDown()) {
Optional<Tuple<String, BytesArray>> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody());
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
assertThat(content.isPresent(), is(true));
assertThat(content.get().v1(), equalTo("write_blob_max_retries"));
if (Objects.deepEquals(bytes, content.get().v2().array())) {
@ -387,12 +386,12 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
final long bytesRead = Streams.copy(exchange.getRequestBody(), requestBody);
assertThat(Math.toIntExact(bytesRead), anyOf(equalTo(defaultChunkSize), equalTo(lastChunkSize)));
final int rangeStart = TestUtils.getContentRangeStart(range);
final int rangeEnd = TestUtils.getContentRangeEnd(range);
final int rangeStart = getContentRangeStart(range);
final int rangeEnd = getContentRangeEnd(range);
assertThat(rangeEnd + 1 - rangeStart, equalTo(Math.toIntExact(bytesRead)));
assertArrayEquals(Arrays.copyOfRange(data, rangeStart, rangeEnd + 1), requestBody.toByteArray());
final Integer limit = TestUtils.getContentRangeLimit(range);
final Integer limit = getContentRangeLimit(range);
if (limit != null) {
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
exchange.close();

View File

@ -24,16 +24,10 @@ import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.http.HttpStatus;
import org.apache.lucene.util.ArrayUtil;
import fixture.gcs.GoogleCloudStorageHttpHandler;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -43,37 +37,20 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.threeten.bp.Duration;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@ -100,8 +77,8 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
final Map<String, HttpHandler> handlers = new HashMap<>(2);
handlers.put("/", new InternalHttpHandler());
handlers.put("/token", new FakeOAuth2HttpHandler());
handlers.put("/", new GoogleCloudStorageHttpHandler("bucket"));
handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler());
return Collections.unmodifiableMap(handlers);
}
@ -209,187 +186,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
}
}
/**
* Minimal HTTP handler that acts as a Google Cloud Storage compliant server
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
private static class InternalHttpHandler implements HttpHandler {
private final ConcurrentMap<String, BytesArray> blobs = new ConcurrentHashMap<>();
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
try {
if (Regex.simpleMatch("GET /storage/v1/b/bucket/o*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String prefix = params.get("prefix");
final List<Map.Entry<String, BytesArray>> listOfBlobs = blobs.entrySet().stream()
.filter(blob -> prefix == null || blob.getKey().startsWith(prefix)).collect(Collectors.toList());
final StringBuilder list = new StringBuilder();
list.append("{\"kind\":\"storage#objects\",\"items\":[");
for (Iterator<Map.Entry<String, BytesArray>> it = listOfBlobs.iterator(); it.hasNext(); ) {
Map.Entry<String, BytesArray> blob = it.next();
list.append("{\"kind\":\"storage#object\",");
list.append("\"bucket\":\"bucket\",");
list.append("\"name\":\"").append(blob.getKey()).append("\",");
list.append("\"id\":\"").append(blob.getKey()).append("\",");
list.append("\"size\":\"").append(blob.getValue().length()).append("\"");
list.append('}');
if (it.hasNext()) {
list.append(',');
}
}
list.append("]}");
byte[] response = list.toString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("GET /storage/v1/b/bucket*", request)) {
byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\"bucket\",\"id\":\"0\"}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("GET /download/storage/v1/b/bucket/o/*", request)) {
BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/bucket/o/", ""));
if (blob != null) {
final String range = exchange.getRequestHeaders().getFirst("Range");
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
assert matcher.find();
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
}
} else if (Regex.simpleMatch("DELETE /storage/v1/b/bucket/o/*", request)) {
int deletions = 0;
for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesArray> blob = iterator.next();
if (blob.getKey().equals(exchange.getRequestURI().toString())) {
iterator.remove();
deletions++;
}
}
exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);
} else if (Regex.simpleMatch("POST /batch/storage/v1", request)) {
final String uri = "/storage/v1/b/bucket/o/";
final StringBuilder batch = new StringBuilder();
for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
batch.append(line).append('\n');
} else if (line.startsWith("DELETE")) {
final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP"));
if (Strings.hasText(name)) {
try {
final String blobName = URLDecoder.decode(name, UTF_8.name());
if (blobs.entrySet().removeIf(blob -> blob.getKey().equals(blobName))) {
batch.append("HTTP/1.1 204 NO_CONTENT").append('\n');
batch.append('\n');
}
} catch (UnsupportedEncodingException e) {
batch.append("HTTP/1.1 404 NOT_FOUND").append('\n');
batch.append('\n');
}
}
}
}
byte[] response = batch.toString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type"));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=multipart*", request)) {
Optional<Tuple<String, BytesArray>> content = TestUtils.parseMultipartRequestBody(exchange.getRequestBody());
if (content.isPresent()) {
blobs.put(content.get().v1(), content.get().v2());
byte[] response = ("{\"bucket\":\"bucket\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
exchange.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1);
}
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=resumable*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String blobName = params.get("name");
blobs.put(blobName, BytesArray.EMPTY);
byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl() + "/upload/storage/v1/b/bucket/o?"
+ "uploadType=resumable"
+ "&upload_id=" + UUIDs.randomBase64UUID()
+ "&test_blob_name=" + blobName); // not a Google Storage parameter, but it allows to pass the blob name
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/bucket/o?*uploadType=resumable*", request)) {
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String blobName = params.get("test_blob_name");
byte[] blob = blobs.get(blobName).array();
assertNotNull(blob);
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
final Integer limit = TestUtils.getContentRangeLimit(range);
final int start = TestUtils.getContentRangeStart(range);
final int end = TestUtils.getContentRangeEnd(range);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
long bytesRead = Streams.copy(exchange.getRequestBody(), out);
int length = Math.max(end + 1, limit != null ? limit : 0);
assertThat((int) bytesRead, lessThanOrEqualTo(length));
if (length > blob.length) {
blob = ArrayUtil.growExact(blob, length);
}
System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead));
blobs.put(blobName, new BytesArray(blob));
if (limit == null) {
exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
exchange.getResponseHeaders().add("Content-Length", "0");
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
} else {
assertThat(limit, lessThanOrEqualTo(blob.length));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
} else {
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} finally {
exchange.close();
}
}
}
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a fake OAuth2 authentication service")
private static class FakeOAuth2HttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length);
exchange.getResponseBody().write(response);
exchange.close();
}
}
/**
* HTTP handler that injects random Google Cloud Storage service errors
*

View File

@ -18,28 +18,14 @@
*/
package org.elasticsearch.repositories.gcs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyPairGenerator;
import java.util.Arrays;
import java.util.Base64;
import java.util.Locale;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
final class TestUtils {
@ -72,87 +58,4 @@ final class TestUtils {
throw new AssertionError("Unable to create service account file", e);
}
}
static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
Tuple<String, BytesArray> content = null;
try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) {
String name = null;
int read;
while ((read = in.read()) != -1) {
boolean markAndContinue = false;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
do { // search next consecutive {carriage return, new line} chars and stop
if ((char) read == '\r') {
int next = in.read();
if (next != -1) {
if (next == '\n') {
break;
}
out.write(read);
out.write(next);
continue;
}
}
out.write(read);
} while ((read = in.read()) != -1);
final String line = new String(out.toByteArray(), UTF_8);
if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--")
|| line.toLowerCase(Locale.ROOT).startsWith("content")) {
markAndContinue = true;
} else if (line.startsWith("{\"bucket\":\"bucket\"")) {
markAndContinue = true;
Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
if (matcher.find()) {
name = matcher.group(1);
}
}
if (markAndContinue) {
in.mark(Integer.MAX_VALUE);
continue;
}
}
if (name != null) {
in.reset();
try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) {
while ((read = in.read()) != -1) {
binary.write(read);
}
binary.flush();
byte[] tmp = binary.toByteArray();
// removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23)));
}
}
}
}
return Optional.ofNullable(content);
}
private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)");
private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("([0-9]*)-([0-9]*)");
private static Integer parse(final Pattern pattern, final String contentRange, final BiFunction<String, String, Integer> fn) {
final Matcher matcher = pattern.matcher(contentRange);
if (matcher.matches() == false || matcher.groupCount() != 2) {
throw new IllegalArgumentException("Unable to parse content range header");
}
return fn.apply(matcher.group(1), matcher.group(2));
}
static Integer getContentRangeLimit(final String contentRange) {
return parse(PATTERN_CONTENT_RANGE, contentRange, (bytes, limit) -> "*".equals(limit) ? null : Integer.parseInt(limit));
}
static int getContentRangeStart(final String contentRange) {
return parse(PATTERN_CONTENT_RANGE, contentRange,
(bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
(start, end) -> Integer.parseInt(start)));
}
static int getContentRangeEnd(final String contentRange) {
return parse(PATTERN_CONTENT_RANGE, contentRange,
(bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
(start, end) -> Integer.parseInt(end)));
}
}

View File

@ -52,10 +52,11 @@ List projects = [
'server',
'server:cli',
'test:framework',
'test:fixtures:azure-fixture',
'test:fixtures:gcs-fixture',
'test:fixtures:hdfs-fixture',
'test:fixtures:krb5kdc-fixture',
'test:fixtures:old-elasticsearch',
'test:fixtures:azure-fixture',
'test:logger-usage'
]

17
test/fixtures/gcs-fixture/Dockerfile vendored Normal file
View File

@ -0,0 +1,17 @@
FROM ubuntu:19.04
RUN apt-get update -qqy
RUN apt-get install -qqy openjdk-12-jre-headless
ARG port
ARG bucket
ARG token
ENV GCS_FIXTURE_PORT=${port}
ENV GCS_FIXTURE_BUCKET=${bucket}
ENV GCS_FIXTURE_TOKEN=${token}
ENTRYPOINT exec java -classpath "/fixture/shared/*" \
fixture.gcs.GoogleCloudStorageHttpFixture 0.0.0.0 "$GCS_FIXTURE_PORT" "$GCS_FIXTURE_BUCKET" "$GCS_FIXTURE_TOKEN"
EXPOSE $port

39
test/fixtures/gcs-fixture/build.gradle vendored Normal file
View File

@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
apply plugin: 'elasticsearch.build'
apply plugin: 'elasticsearch.test.fixtures'
description = 'Fixture for Google Cloud Storage service'
test.enabled = false
dependencies {
compile project(':server')
}
preProcessFixture {
dependsOn jar
doLast {
file("${testFixturesDir}/shared").mkdirs()
project.copy {
from jar
from configurations.runtimeClasspath
into "${testFixturesDir}/shared"
}
}
}

View File

@ -0,0 +1,26 @@
version: '3'
services:
gcs-fixture:
build:
context: .
args:
port: 80
bucket: "bucket"
token: "o/oauth2/token"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
gcs-fixture-third-party:
build:
context: .
args:
port: 80
bucket: "bucket"
token: "o/oauth2/token"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"

View File

@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.gcs;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import static java.nio.charset.StandardCharsets.UTF_8;
@SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service")
public class FakeOAuth2HttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.gcs;
import com.sun.net.httpserver.HttpServer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
public class GoogleCloudStorageHttpFixture {
private final HttpServer server;
private GoogleCloudStorageHttpFixture(final String address, final int port,
final String bucket, final String token) throws IOException {
this.server = HttpServer.create(new InetSocketAddress(InetAddress.getByName(address), port), 0);
server.createContext("/" + token, new FakeOAuth2HttpHandler());
server.createContext("/", new GoogleCloudStorageHttpHandler(bucket));
}
private void start() throws Exception {
try {
server.start();
// wait to be killed
Thread.sleep(Long.MAX_VALUE);
} finally {
server.stop(0);
}
}
public static void main(final String[] args) throws Exception {
if (args == null || args.length != 4) {
throw new IllegalArgumentException("GoogleCloudStorageHttpFixture expects 4 arguments [address, port, bucket, token]");
}
GoogleCloudStorageHttpFixture fixture = new GoogleCloudStorageHttpFixture(args[0], Integer.parseInt(args[1]), args[2], args[3]);
fixture.start();
}
}

View File

@ -0,0 +1,340 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package fixture.gcs;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Minimal HTTP handler that acts as a Google Cloud Storage compliant server
*/
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageHttpHandler implements HttpHandler {
private final ConcurrentMap<String, BytesArray> blobs;
private final String bucket;
public GoogleCloudStorageHttpHandler(final String bucket) {
this.bucket = Objects.requireNonNull(bucket);
this.blobs = new ConcurrentHashMap<>();
}
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
try {
if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String prefix = params.getOrDefault("prefix", "");
final String delimiter = params.get("delimiter");
final Set<String> prefixes = new HashSet<>();
final List<String> listOfBlobs = new ArrayList<>();
for (final Map.Entry<String, BytesArray> blob : blobs.entrySet()) {
final String blobName = blob.getKey();
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
if (delimiterPos > -1) {
prefixes.add("\"" + blobName.substring(0, prefix.length() + delimiterPos + 1) + "\"");
} else {
listOfBlobs.add("{\"kind\":\"storage#object\","
+ "\"bucket\":\"" + bucket + "\","
+ "\"name\":\"" + blobName + "\","
+ "\"id\":\"" + blobName + "\","
+ "\"size\":\"" + blob.getValue().length() + "\""
+ "}");
}
}
}
byte[] response = ("{\"kind\":\"storage#objects\",\"items\":[" +
String.join(",", listOfBlobs) +
"],\"prefixes\":[" +
String.join(",", prefixes) +
"]}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "*", request)) {
// GET Bucket https://cloud.google.com/storage/docs/json_api/v1/buckets/get
byte[] response = ("{\"kind\":\"storage#bucket\",\"name\":\""+ bucket + "\",\"id\":\"0\"}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) {
// Download Object https://cloud.google.com/storage/docs/request-body
BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
if (blob != null) {
final String range = exchange.getRequestHeaders().getFirst("Range");
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
if (matcher.find() == false) {
throw new AssertionError("Range bytes header does not match expected format: " + range);
}
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
}
} else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) {
// Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete
int deletions = 0;
for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
Map.Entry<String, BytesArray> blob = iterator.next();
if (blob.getKey().equals(exchange.getRequestURI().toString())) {
iterator.remove();
deletions++;
}
}
exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);
} else if (Regex.simpleMatch("POST /batch/storage/v1", request)) {
// Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
final String uri = "/storage/v1/b/" + bucket + "/o/";
final StringBuilder batch = new StringBuilder();
for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
batch.append(line).append('\n');
} else if (line.startsWith("DELETE")) {
final String name = line.substring(line.indexOf(uri) + uri.length(), line.lastIndexOf(" HTTP"));
if (Strings.hasText(name)) {
String blobName = URLDecoder.decode(name, UTF_8.name());
if (blobs.entrySet().removeIf(blob -> blob.getKey().equals(blobName))) {
batch.append("HTTP/1.1 204 NO_CONTENT").append('\n');
batch.append('\n');
}
}
}
}
byte[] response = batch.toString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", exchange.getRequestHeaders().getFirst("Content-Type"));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
// Multipart upload
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
if (content.isPresent()) {
blobs.put(content.get().v1(), content.get().v2());
byte[] response = ("{\"bucket\":\"" + bucket + "\",\"name\":\"" + content.get().v1() + "\"}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else {
exchange.sendResponseHeaders(RestStatus.BAD_REQUEST.getStatus(), -1);
}
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=resumable*", request)) {
// Resumable upload initialization https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String blobName = params.get("name");
blobs.put(blobName, BytesArray.EMPTY);
byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
+ "uploadType=resumable"
+ "&upload_id=" + UUIDs.randomBase64UUID()
+ "&test_blob_name=" + blobName); // not a Google Storage parameter, but it allows to pass the blob name
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/" + bucket + "/o?*uploadType=resumable*", request)) {
// Resumable upload https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
final String blobName = params.get("test_blob_name");
byte[] blob = blobs.get(blobName).array();
if (blob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
return;
}
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
final Integer limit = getContentRangeLimit(range);
final int start = getContentRangeStart(range);
final int end = getContentRangeEnd(range);
final ByteArrayOutputStream out = new ByteArrayOutputStream();
long bytesRead = Streams.copy(exchange.getRequestBody(), out);
int length = Math.max(end + 1, limit != null ? limit : 0);
if ((int) bytesRead > length) {
throw new AssertionError("Requesting more bytes than available for blob");
}
if (length > blob.length) {
blob = ArrayUtil.growExact(blob, length);
}
System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(bytesRead));
blobs.put(blobName, new BytesArray(blob));
if (limit == null) {
exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
exchange.getResponseHeaders().add("Content-Length", "0");
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
} else {
if (limit > blob.length) {
throw new AssertionError("Requesting more bytes than available for blob");
}
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
} else {
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} finally {
exchange.close();
}
}
private String httpServerUrl(final HttpExchange exchange) {
final InetSocketAddress address = exchange.getLocalAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
}
public static Optional<Tuple<String, BytesArray>> parseMultipartRequestBody(final InputStream requestBody) throws IOException {
Tuple<String, BytesArray> content = null;
try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(requestBody))) {
String name = null;
int read;
while ((read = in.read()) != -1) {
boolean markAndContinue = false;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
do { // search next consecutive {carriage return, new line} chars and stop
if ((char) read == '\r') {
int next = in.read();
if (next != -1) {
if (next == '\n') {
break;
}
out.write(read);
out.write(next);
continue;
}
}
out.write(read);
} while ((read = in.read()) != -1);
final String line = new String(out.toByteArray(), UTF_8);
if (line.length() == 0 || line.equals("\r\n") || line.startsWith("--")
|| line.toLowerCase(Locale.ROOT).startsWith("content")) {
markAndContinue = true;
} else if (line.startsWith("{\"bucket\":")) {
markAndContinue = true;
Matcher matcher = Pattern.compile("\"name\":\"([^\"]*)\"").matcher(line);
if (matcher.find()) {
name = matcher.group(1);
}
}
if (markAndContinue) {
in.mark(Integer.MAX_VALUE);
continue;
}
}
if (name != null) {
in.reset();
try (ByteArrayOutputStream binary = new ByteArrayOutputStream()) {
while ((read = in.read()) != -1) {
binary.write(read);
}
binary.flush();
byte[] tmp = binary.toByteArray();
// removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
content = Tuple.tuple(name, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23)));
}
}
}
}
return Optional.ofNullable(content);
}
private static final Pattern PATTERN_CONTENT_RANGE = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)");
private static final Pattern PATTERN_CONTENT_RANGE_BYTES = Pattern.compile("([0-9]*)-([0-9]*)");
private static Integer parse(final Pattern pattern, final String contentRange, final BiFunction<String, String, Integer> fn) {
final Matcher matcher = pattern.matcher(contentRange);
if (matcher.matches() == false || matcher.groupCount() != 2) {
throw new IllegalArgumentException("Unable to parse content range header");
}
return fn.apply(matcher.group(1), matcher.group(2));
}
public static Integer getContentRangeLimit(final String contentRange) {
return parse(PATTERN_CONTENT_RANGE, contentRange, (bytes, limit) -> "*".equals(limit) ? null : Integer.parseInt(limit));
}
public static int getContentRangeStart(final String contentRange) {
return parse(PATTERN_CONTENT_RANGE, contentRange,
(bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
(start, end) -> Integer.parseInt(start)));
}
public static int getContentRangeEnd(final String contentRange) {
return parse(PATTERN_CONTENT_RANGE, contentRange,
(bytes, limit) -> parse(PATTERN_CONTENT_RANGE_BYTES, bytes,
(start, end) -> Integer.parseInt(end)));
}
}