Mutualize code in cloud-based repository integration tests (#46483)
This commit factors out some common code between the cloud-based repository integration tests that were recently improved. Relates #46376
This commit is contained in:
parent
8428f8e6e8
commit
88bed09119
|
@ -23,29 +23,18 @@ import com.microsoft.azure.storage.RetryExponentialRetry;
|
|||
import com.microsoft.azure.storage.RetryPolicyFactory;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
import java.util.Collection;
|
||||
|
@ -53,38 +42,9 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
|
||||
public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
||||
|
||||
private static HttpServer httpServer;
|
||||
|
||||
@BeforeClass
|
||||
public static void startHttpServer() throws Exception {
|
||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||
httpServer.start();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpHttpServer() {
|
||||
HttpHandler handler = new InternalHttpHandler();
|
||||
if (randomBoolean()) {
|
||||
handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3));
|
||||
}
|
||||
httpServer.createContext("/container", handler);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopHttpServer() {
|
||||
httpServer.stop(0);
|
||||
httpServer = null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownHttpServer() {
|
||||
httpServer.removeContext("/container");
|
||||
}
|
||||
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected String repositoryType() {
|
||||
|
@ -104,6 +64,16 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|||
return Collections.singletonList(TestAzureRepositoryPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||
return Collections.singletonMap("/container", new InternalHttpHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
|
||||
return new AzureErroneousHttpHandler(delegate, randomIntBetween(2, 3));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
final String key = Base64.getEncoder().encodeToString(randomAlphaOfLength(10).getBytes(StandardCharsets.UTF_8));
|
||||
|
@ -111,10 +81,7 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|||
secureSettings.setString(AzureStorageSettings.ACCOUNT_SETTING.getConcreteSettingForNamespace("test").getKey(), "account");
|
||||
secureSettings.setString(AzureStorageSettings.KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), key);
|
||||
|
||||
final InetSocketAddress address = httpServer.getAddress();
|
||||
final String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://"
|
||||
+ InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
||||
|
||||
final String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + httpServerUrl();
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(AzureStorageSettings.ENDPOINT_SUFFIX_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint)
|
||||
|
@ -218,7 +185,6 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* HTTP handler that injects random Azure service errors
|
||||
*
|
||||
|
@ -226,40 +192,16 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes
|
|||
* slow down the test suite.
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
|
||||
private static class ErroneousHttpHandler implements HttpHandler {
|
||||
private static class AzureErroneousHttpHandler extends ErroneousHttpHandler {
|
||||
|
||||
// first key is the remote address, second key is the HTTP request unique id provided by the SDK client,
|
||||
// value is the number of times the request has been seen
|
||||
private final Map<String, AtomicInteger> requests;
|
||||
private final HttpHandler delegate;
|
||||
private final int maxErrorsPerRequest;
|
||||
|
||||
private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||
this.requests = new ConcurrentHashMap<>();
|
||||
this.delegate = delegate;
|
||||
this.maxErrorsPerRequest = maxErrorsPerRequest;
|
||||
assert maxErrorsPerRequest > 1;
|
||||
AzureErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||
super(delegate, maxErrorsPerRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
|
||||
assert Strings.hasText(requestId);
|
||||
|
||||
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
|
||||
if (count >= maxErrorsPerRequest || randomBoolean()) {
|
||||
requests.remove(requestId);
|
||||
delegate.handle(exchange);
|
||||
} else {
|
||||
handleAsError(exchange, requestId);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleAsError(final HttpExchange exchange, final String requestId) throws IOException {
|
||||
Streams.readFully(exchange.getRequestBody());
|
||||
exchange.getResponseHeaders().add(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER, requestId);
|
||||
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
||||
exchange.close();
|
||||
protected String requestUniqueId(final HttpExchange exchange) {
|
||||
// Azure SDK client provides a unique ID per request
|
||||
return exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.cloud.http.HttpTransportOptions;
|
|||
import com.google.cloud.storage.StorageOptions;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -32,7 +31,6 @@ import org.elasticsearch.common.SuppressForbidden;
|
|||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -40,23 +38,16 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.threeten.bp.Duration;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URLDecoder;
|
||||
import java.security.KeyPairGenerator;
|
||||
import java.util.Arrays;
|
||||
|
@ -71,7 +62,6 @@ import java.util.Map;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -85,42 +75,10 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BU
|
|||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
||||
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
|
||||
|
||||
private static HttpServer httpServer;
|
||||
private static boolean randomServerErrors;
|
||||
private static byte[] serviceAccount;
|
||||
|
||||
@BeforeClass
|
||||
public static void startHttpServer() throws Exception {
|
||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||
httpServer.start();
|
||||
randomServerErrors = randomBoolean();
|
||||
serviceAccount = createServiceAccount();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpHttpServer() {
|
||||
HttpHandler handler = new InternalHttpHandler();
|
||||
if (randomServerErrors) {
|
||||
handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3));
|
||||
}
|
||||
httpServer.createContext("/", handler);
|
||||
httpServer.createContext("/token", new FakeOAuth2HttpHandler());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopHttpServer() {
|
||||
httpServer.stop(0);
|
||||
httpServer = null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownHttpServer() {
|
||||
httpServer.removeContext("/");
|
||||
httpServer.removeContext("/token");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String repositoryType() {
|
||||
return GoogleCloudStorageRepository.TYPE;
|
||||
|
@ -140,15 +98,29 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|||
return Collections.singletonList(TestGoogleCloudStoragePlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||
final Map<String, HttpHandler> handlers = new HashMap<>(2);
|
||||
handlers.put("/", new InternalHttpHandler());
|
||||
handlers.put("/token", new FakeOAuth2HttpHandler());
|
||||
return Collections.unmodifiableMap(handlers);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
|
||||
return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
if (serviceAccount == null) {
|
||||
serviceAccount = createServiceAccount();
|
||||
}
|
||||
|
||||
final Settings.Builder settings = Settings.builder();
|
||||
settings.put(super.nodeSettings(nodeOrdinal));
|
||||
|
||||
final InetSocketAddress address = httpServer.getAddress();
|
||||
final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
||||
settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint);
|
||||
settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint + "/token");
|
||||
settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl());
|
||||
settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl() + "/token");
|
||||
|
||||
final MockSecureSettings secureSettings = new MockSecureSettings();
|
||||
secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(), serviceAccount);
|
||||
|
@ -206,47 +178,48 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|||
|
||||
@Override
|
||||
protected GoogleCloudStorageService createStorageService() {
|
||||
if (randomServerErrors) {
|
||||
return new GoogleCloudStorageService() {
|
||||
@Override
|
||||
StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
|
||||
final HttpTransportOptions httpTransportOptions) {
|
||||
return super.createStorageOptions(clientSettings, httpTransportOptions)
|
||||
.toBuilder()
|
||||
.setRetrySettings(RetrySettings.newBuilder()
|
||||
.setMaxAttempts(10)
|
||||
.setInitialRetryDelay(Duration.ofMillis(10L))
|
||||
.setRetryDelayMultiplier(2.0d)
|
||||
.setMaxRetryDelay(Duration.ofSeconds(1L))
|
||||
.setTotalTimeout(Duration.ofSeconds(30L))
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
};
|
||||
}
|
||||
return super.createStorageService();
|
||||
return new GoogleCloudStorageService() {
|
||||
@Override
|
||||
StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
|
||||
final HttpTransportOptions httpTransportOptions) {
|
||||
return super.createStorageOptions(clientSettings, httpTransportOptions)
|
||||
.toBuilder()
|
||||
.setRetrySettings(RetrySettings.newBuilder()
|
||||
.setMaxAttempts(10)
|
||||
.setInitialRetryDelay(Duration.ofMillis(10L))
|
||||
.setRetryDelayMultiplier(2.0d)
|
||||
.setMaxRetryDelay(Duration.ofSeconds(1L))
|
||||
.setTotalTimeout(Duration.ofSeconds(30L))
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] createServiceAccount() throws Exception {
|
||||
final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
|
||||
keyPairGenerator.initialize(1024);
|
||||
final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded());
|
||||
private static byte[] createServiceAccount() {
|
||||
try {
|
||||
final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
|
||||
keyPairGenerator.initialize(1024);
|
||||
final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded());
|
||||
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("type", "service_account");
|
||||
builder.field("project_id", getTestClass().getName().toLowerCase(Locale.ROOT));
|
||||
builder.field("private_key_id", UUID.randomUUID().toString());
|
||||
builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n");
|
||||
builder.field("client_email", "elastic@appspot.gserviceaccount.com");
|
||||
builder.field("client_id", String.valueOf(randomNonNegativeLong()));
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("type", "service_account");
|
||||
builder.field("project_id", getTestClass().getName().toLowerCase(Locale.ROOT));
|
||||
builder.field("private_key_id", UUID.randomUUID().toString());
|
||||
builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n");
|
||||
builder.field("client_email", "elastic@appspot.gserviceaccount.com");
|
||||
builder.field("client_id", String.valueOf(randomNonNegativeLong()));
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
return out.toByteArray();
|
||||
} catch (Exception e) {
|
||||
throw new AssertionError("Unable to create service account file", e);
|
||||
}
|
||||
return out.toByteArray();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -435,45 +408,24 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
|||
* slow down the test suite.
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||
private static class ErroneousHttpHandler implements HttpHandler {
|
||||
private static class GoogleErroneousHttpHandler extends ErroneousHttpHandler {
|
||||
|
||||
// first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client,
|
||||
// value is the number of times the request has been seen
|
||||
private final Map<String, AtomicInteger> requests;
|
||||
private final HttpHandler delegate;
|
||||
private final int maxErrorsPerRequest;
|
||||
|
||||
private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||
this.requests = new ConcurrentHashMap<>();
|
||||
this.delegate = delegate;
|
||||
this.maxErrorsPerRequest = maxErrorsPerRequest;
|
||||
assert maxErrorsPerRequest > 1;
|
||||
GoogleErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||
super(delegate, maxErrorsPerRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
final String requestId = exchange.getRemoteAddress().toString()
|
||||
protected String requestUniqueId(HttpExchange exchange) {
|
||||
return exchange.getRemoteAddress().toString()
|
||||
+ " " + exchange.getRequestMethod()
|
||||
+ " " + exchange.getRequestURI();
|
||||
assert Strings.hasText(requestId);
|
||||
|
||||
// Batch requests are not retried so we don't want to fail them
|
||||
// The batched request are supposed to be retried (not tested here)
|
||||
final boolean noError = exchange.getRequestURI().toString().startsWith("/batch/") || randomBoolean();
|
||||
|
||||
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
|
||||
if (count >= maxErrorsPerRequest || noError) {
|
||||
requests.remove(requestId);
|
||||
delegate.handle(exchange);
|
||||
} else {
|
||||
handleAsError(exchange);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleAsError(final HttpExchange exchange) throws IOException {
|
||||
Streams.readFully(exchange.getRequestBody());
|
||||
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
||||
exchange.close();
|
||||
@Override
|
||||
protected boolean canFailRequest(final HttpExchange exchange) {
|
||||
// Batch requests are not retried so we don't want to fail them
|
||||
// The batched request are supposed to be retried (not tested here)
|
||||
return exchange.getRequestURI().toString().startsWith("/batch/") == false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,34 +19,22 @@
|
|||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.amazonaws.http.AmazonHttpClient;
|
||||
import com.amazonaws.services.s3.Headers;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
|
||||
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -57,41 +45,12 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
|
||||
public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
||||
|
||||
private static HttpServer httpServer;
|
||||
|
||||
@BeforeClass
|
||||
public static void startHttpServer() throws Exception {
|
||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||
httpServer.start();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpHttpServer() {
|
||||
HttpHandler handler = new InternalHttpHandler();
|
||||
if (randomBoolean()) {
|
||||
handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3));
|
||||
}
|
||||
httpServer.createContext("/bucket", handler);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopHttpServer() {
|
||||
httpServer.stop(0);
|
||||
httpServer = null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownHttpServer() {
|
||||
httpServer.removeContext("/bucket");
|
||||
}
|
||||
public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected String repositoryType() {
|
||||
|
@ -111,17 +70,24 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
|
|||
return Collections.singletonList(TestS3RepositoryPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||
return Collections.singletonMap("/bucket", new InternalHttpHandler());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
|
||||
return new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
final MockSecureSettings secureSettings = new MockSecureSettings();
|
||||
secureSettings.setString(S3ClientSettings.ACCESS_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "access");
|
||||
secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret");
|
||||
|
||||
final InetSocketAddress address = httpServer.getAddress();
|
||||
final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
||||
|
||||
return Settings.builder()
|
||||
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint)
|
||||
.put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl())
|
||||
// Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side
|
||||
.put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true)
|
||||
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
|
||||
|
@ -243,45 +209,19 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa
|
|||
* HTTP handler that injects random S3 service errors
|
||||
*
|
||||
* Note: it is not a good idea to allow this handler to simulate too many errors as it would
|
||||
* slow down the test suite and/or could trigger SDK client request throttling (and request
|
||||
* would fail before reaching the max retry attempts - this can be mitigated by disabling
|
||||
* {@link S3ClientSettings#USE_THROTTLE_RETRIES_SETTING})
|
||||
* slow down the test suite.
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
|
||||
private static class ErroneousHttpHandler implements HttpHandler {
|
||||
private static class S3ErroneousHttpHandler extends ErroneousHttpHandler {
|
||||
|
||||
// first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client,
|
||||
// value is the number of times the request has been seen
|
||||
private final Map<String, AtomicInteger> requests;
|
||||
private final HttpHandler delegate;
|
||||
private final int maxErrorsPerRequest;
|
||||
|
||||
private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||
this.requests = new ConcurrentHashMap<>();
|
||||
this.delegate = delegate;
|
||||
this.maxErrorsPerRequest = maxErrorsPerRequest;
|
||||
assert maxErrorsPerRequest > 1;
|
||||
S3ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||
super(delegate, maxErrorsPerRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
final String requestId = exchange.getRequestHeaders().getFirst(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID);
|
||||
assert Strings.hasText(requestId);
|
||||
|
||||
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
|
||||
if (count >= maxErrorsPerRequest || randomBoolean()) {
|
||||
requests.remove(requestId);
|
||||
delegate.handle(exchange);
|
||||
} else {
|
||||
handleAsError(exchange, requestId);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleAsError(final HttpExchange exchange, final String requestId) throws IOException {
|
||||
Streams.readFully(exchange.getRequestBody());
|
||||
exchange.getResponseHeaders().add(Headers.REQUEST_ID, requestId);
|
||||
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
||||
exchange.close();
|
||||
protected String requestUniqueId(final HttpExchange exchange) {
|
||||
// Amazon SDK client provides a unique ID per request
|
||||
return exchange.getRequestHeaders().getFirst(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.repositories.blobstore;
|
||||
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
|
||||
public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase {
|
||||
|
||||
private static HttpServer httpServer;
|
||||
private Map<String, HttpHandler> handlers;
|
||||
|
||||
@BeforeClass
|
||||
public static void startHttpServer() throws Exception {
|
||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||
httpServer.start();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUpHttpServer() {
|
||||
handlers = createHttpHandlers();
|
||||
handlers.forEach((c, h) -> {
|
||||
HttpHandler handler = h;
|
||||
if (randomBoolean()) {
|
||||
handler = createErroneousHttpHandler(handler);
|
||||
}
|
||||
httpServer.createContext(c, handler);
|
||||
});
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopHttpServer() {
|
||||
httpServer.stop(0);
|
||||
httpServer = null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDownHttpServer() {
|
||||
if (handlers != null) {
|
||||
handlers.keySet().forEach(context -> httpServer.removeContext(context));
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract Map<String, HttpHandler> createHttpHandlers();
|
||||
|
||||
protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate);
|
||||
|
||||
protected static String httpServerUrl() {
|
||||
InetSocketAddress address = httpServer.getAddress();
|
||||
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* HTTP handler that injects random service errors
|
||||
*
|
||||
* Note: it is not a good idea to allow this handler to simulate too many errors as it would
|
||||
* slow down the test suite.
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
|
||||
protected abstract static class ErroneousHttpHandler implements HttpHandler {
|
||||
|
||||
// first key is a unique identifier for the incoming HTTP request,
|
||||
// value is the number of times the request has been seen
|
||||
private final Map<String, AtomicInteger> requests;
|
||||
private final HttpHandler delegate;
|
||||
private final int maxErrorsPerRequest;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
|
||||
protected ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||
this.requests = new ConcurrentHashMap<>();
|
||||
this.delegate = delegate;
|
||||
this.maxErrorsPerRequest = maxErrorsPerRequest;
|
||||
assert maxErrorsPerRequest > 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
final String requestId = requestUniqueId(exchange);
|
||||
assert Strings.hasText(requestId);
|
||||
|
||||
final boolean canFailRequest = canFailRequest(exchange);
|
||||
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
|
||||
if (count >= maxErrorsPerRequest || canFailRequest == false || randomBoolean()) {
|
||||
requests.remove(requestId);
|
||||
delegate.handle(exchange);
|
||||
} else {
|
||||
handleAsError(exchange);
|
||||
}
|
||||
}
|
||||
|
||||
private void handleAsError(final HttpExchange exchange) throws IOException {
|
||||
Streams.readFully(exchange.getRequestBody());
|
||||
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
||||
exchange.close();
|
||||
}
|
||||
|
||||
protected abstract String requestUniqueId(HttpExchange exchange);
|
||||
|
||||
protected boolean canFailRequest(final HttpExchange exchange) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue