Replace MockAmazonS3 usage in S3BlobStoreRepositoryTests by a HTTP server (#46081)
This commit removes the usage of MockAmazonS3 in S3BlobStoreRepositoryTests and replaces it by a HttpServer that emulates the S3 service. This allows the repository tests to use the real Amazon's S3 client under the hood in tests and will allow to test the behavior of the snapshot/restore feature for S3 repositories by simulating random server-side internal errors. The HTTP server used to emulate the S3 service is intentionally simple and minimal to keep things understandable and maintainable. Testing full client options on the server side (like authentication, chunked encoding etc) remains the responsibility of the AmazonS3Fixture.
This commit is contained in:
parent
93ede78b66
commit
b526309fbd
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
|
||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
|
@ -46,7 +47,6 @@ import org.junit.Before;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
|
@ -99,13 +99,10 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
|
|||
final Settings.Builder clientSettings = Settings.builder();
|
||||
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
||||
|
||||
final String endpoint;
|
||||
if (httpServer.getAddress().getAddress() instanceof Inet6Address) {
|
||||
endpoint = "http://[" + httpServer.getAddress().getHostString() + "]:" + httpServer.getAddress().getPort();
|
||||
} else {
|
||||
endpoint = "http://" + httpServer.getAddress().getHostString() + ":" + httpServer.getAddress().getPort();
|
||||
}
|
||||
final InetSocketAddress address = httpServer.getAddress();
|
||||
final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
||||
clientSettings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), endpoint);
|
||||
|
||||
if (maxRetries != null) {
|
||||
clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries);
|
||||
}
|
||||
|
|
|
@ -18,53 +18,70 @@
|
|||
*/
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||
import com.amazonaws.services.s3.model.StorageClass;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
|
||||
public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
||||
|
||||
private static final ConcurrentMap<String, byte[]> blobs = new ConcurrentHashMap<>();
|
||||
private static String bucket;
|
||||
private static ByteSizeValue bufferSize;
|
||||
private static boolean serverSideEncryption;
|
||||
private static String cannedACL;
|
||||
private static String storageClass;
|
||||
private static HttpServer httpServer;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpRepositorySettings() {
|
||||
bucket = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||
bufferSize = new ByteSizeValue(randomIntBetween(5, 50), ByteSizeUnit.MB);
|
||||
serverSideEncryption = randomBoolean();
|
||||
if (randomBoolean()) {
|
||||
cannedACL = randomFrom(CannedAccessControlList.values()).toString();
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
storageClass = randomValueOtherThan(StorageClass.Glacier, () -> randomFrom(StorageClass.values())).toString();
|
||||
}
|
||||
public static void startHttpServer() throws Exception {
|
||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||
httpServer.start();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpHttpServer() {
|
||||
httpServer.createContext("/bucket", new InternalHttpHandler());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopHttpServer() {
|
||||
httpServer.stop(0);
|
||||
httpServer = null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void wipeRepository() {
|
||||
blobs.clear();
|
||||
public void tearDownHttpServer() {
|
||||
httpServer.removeContext("/bucket");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,11 +92,8 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
|
|||
@Override
|
||||
protected Settings repositorySettings() {
|
||||
return Settings.builder()
|
||||
.put(S3Repository.BUCKET_SETTING.getKey(), bucket)
|
||||
.put(S3Repository.BUFFER_SIZE_SETTING.getKey(), bufferSize)
|
||||
.put(S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getKey(), serverSideEncryption)
|
||||
.put(S3Repository.CANNED_ACL_SETTING.getKey(), cannedACL)
|
||||
.put(S3Repository.STORAGE_CLASS_SETTING.getKey(), storageClass)
|
||||
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
|
||||
.put(S3Repository.CLIENT_NAME.getKey(), "test")
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -88,6 +102,25 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
|
|||
return Collections.singletonList(TestS3RepositoryPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
final MockSecureSettings secureSettings = new MockSecureSettings();
|
||||
secureSettings.setString(S3ClientSettings.ACCESS_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "access");
|
||||
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret");
|
||||
|
||||
final InetSocketAddress address = httpServer.getAddress();
|
||||
final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
||||
|
||||
return Settings.builder()
|
||||
.put(Settings.builder()
|
||||
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint)
|
||||
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
|
||||
.build())
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.setSecureSettings(secureSettings)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static class TestS3RepositoryPlugin extends S3RepositoryPlugin {
|
||||
|
||||
public TestS3RepositoryPlugin(final Settings settings) {
|
||||
|
@ -95,15 +128,105 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Repository.Factory> getRepositories(final Environment env, final NamedXContentRegistry registry,
|
||||
final ThreadPool threadPool) {
|
||||
return Collections.singletonMap(S3Repository.TYPE,
|
||||
metadata -> new S3Repository(metadata, registry, new S3Service() {
|
||||
@Override
|
||||
AmazonS3 buildClient(S3ClientSettings clientSettings) {
|
||||
return new MockAmazonS3(blobs, bucket, serverSideEncryption, cannedACL, storageClass);
|
||||
public List<Setting<?>> getSettings() {
|
||||
final List<Setting<?>> settings = new ArrayList<>(super.getSettings());
|
||||
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
|
||||
settings.add(S3ClientSettings.DISABLE_CHUNKED_ENCODING);
|
||||
return settings;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Minimal HTTP handler that acts as a S3 compliant server
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
|
||||
private static class InternalHttpHandler implements HttpHandler {
|
||||
|
||||
private final ConcurrentMap<String, BytesReference> blobs = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
||||
try {
|
||||
if (Regex.simpleMatch("PUT /bucket/*", request)) {
|
||||
blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody()));
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
|
||||
|
||||
} else if (Regex.simpleMatch("GET /bucket/?prefix=*", request)) {
|
||||
final Map<String, String> params = new HashMap<>();
|
||||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||
assertThat("Test must be adapted for GET Bucket (List Objects) Version 2", params.get("list-type"), nullValue());
|
||||
|
||||
final StringBuilder list = new StringBuilder();
|
||||
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
list.append("<ListBucketResult>");
|
||||
final String prefix = params.get("prefix");
|
||||
if (prefix != null) {
|
||||
list.append("<Prefix>").append(prefix).append("</Prefix>");
|
||||
}
|
||||
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
|
||||
if (prefix == null || blob.getKey().startsWith("/bucket/" + prefix)) {
|
||||
list.append("<Contents>");
|
||||
list.append("<Key>").append(blob.getKey().replace("/bucket/", "")).append("</Key>");
|
||||
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
|
||||
list.append("</Contents>");
|
||||
}
|
||||
}, threadPool));
|
||||
}
|
||||
list.append("</ListBucketResult>");
|
||||
|
||||
byte[] response = list.toString().getBytes(StandardCharsets.UTF_8);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/xml");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
|
||||
} else if (Regex.simpleMatch("GET /bucket/*", request)) {
|
||||
final BytesReference blob = blobs.get(exchange.getRequestURI().toString());
|
||||
if (blob != null) {
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
|
||||
blob.writeTo(exchange.getResponseBody());
|
||||
} else {
|
||||
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
|
||||
}
|
||||
|
||||
} else if (Regex.simpleMatch("DELETE /bucket/*", request)) {
|
||||
int deletions = 0;
|
||||
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, BytesReference> blob = iterator.next();
|
||||
if (blob.getKey().startsWith(exchange.getRequestURI().toString())) {
|
||||
iterator.remove();
|
||||
deletions++;
|
||||
}
|
||||
}
|
||||
exchange.sendResponseHeaders((deletions > 0 ? RestStatus.OK : RestStatus.NO_CONTENT).getStatus(), -1);
|
||||
|
||||
} else if (Regex.simpleMatch("POST /bucket/?delete", request)) {
|
||||
final String requestBody = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), UTF_8));
|
||||
|
||||
final StringBuilder deletes = new StringBuilder();
|
||||
deletes.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
deletes.append("<DeleteResult>");
|
||||
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, BytesReference> blob = iterator.next();
|
||||
String key = blob.getKey().replace("/bucket/", "");
|
||||
if (requestBody.contains("<Key>" + key + "</Key>")) {
|
||||
deletes.append("<Deleted><Key>").append(key).append("</Key></Deleted>");
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
deletes.append("</DeleteResult>");
|
||||
|
||||
byte[] response = deletes.toString().getBytes(StandardCharsets.UTF_8);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/xml");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
|
||||
} else {
|
||||
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
|
||||
}
|
||||
} finally {
|
||||
exchange.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue