diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index cecd8cba1f2..12d212cc1bb 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -23,29 +23,18 @@ import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicyFactory; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpServer; -import org.apache.http.HttpStatus; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; +import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Collection; @@ -53,38 +42,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") -public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase { - - private static HttpServer httpServer; - - @BeforeClass - public static void startHttpServer() throws Exception { - httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); - httpServer.start(); - } - - @Before - public void setUpHttpServer() { - HttpHandler handler = new InternalHttpHandler(); - if (randomBoolean()) { - handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3)); - } - httpServer.createContext("/container", handler); - } - - @AfterClass - public static void stopHttpServer() { - httpServer.stop(0); - httpServer = null; - } - - @After - public void tearDownHttpServer() { - httpServer.removeContext("/container"); - } +public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @Override protected String repositoryType() { @@ -104,6 +64,16 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes return Collections.singletonList(TestAzureRepositoryPlugin.class); } + @Override + protected Map createHttpHandlers() { + return Collections.singletonMap("/container", new InternalHttpHandler()); + } + + @Override + protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) { + return new AzureErroneousHttpHandler(delegate, randomIntBetween(2, 3)); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { final String key = Base64.getEncoder().encodeToString(randomAlphaOfLength(10).getBytes(StandardCharsets.UTF_8)); @@ -111,10 +81,7 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes secureSettings.setString(AzureStorageSettings.ACCOUNT_SETTING.getConcreteSettingForNamespace("test").getKey(), "account"); secureSettings.setString(AzureStorageSettings.KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), key); - final InetSocketAddress address = httpServer.getAddress(); - final String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://" - + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); - + final String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + httpServerUrl(); return Settings.builder() .put(super.nodeSettings(nodeOrdinal)) .put(AzureStorageSettings.ENDPOINT_SUFFIX_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint) @@ -218,7 +185,6 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes } } - /** * HTTP handler that injects random Azure service errors * @@ -226,40 +192,16 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes * slow down the test suite. */ @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") - private static class ErroneousHttpHandler implements HttpHandler { + private static class AzureErroneousHttpHandler extends ErroneousHttpHandler { - // first key is the remote address, second key is the HTTP request unique id provided by the SDK client, - // value is the number of times the request has been seen - private final Map requests; - private final HttpHandler delegate; - private final int maxErrorsPerRequest; - - private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { - this.requests = new ConcurrentHashMap<>(); - this.delegate = delegate; - this.maxErrorsPerRequest = maxErrorsPerRequest; - assert maxErrorsPerRequest > 1; + AzureErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { + super(delegate, maxErrorsPerRequest); } @Override - public void handle(final HttpExchange exchange) throws IOException { - final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); - assert Strings.hasText(requestId); - - final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet(); - if (count >= maxErrorsPerRequest || randomBoolean()) { - requests.remove(requestId); - delegate.handle(exchange); - } else { - handleAsError(exchange, requestId); - } - } - - private void handleAsError(final HttpExchange exchange, final String requestId) throws IOException { - Streams.readFully(exchange.getRequestBody()); - exchange.getResponseHeaders().add(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER, requestId); - exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1); - exchange.close(); + protected String requestUniqueId(final HttpExchange exchange) { + // Azure SDK client provides a unique ID per request + return exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); } } } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index ffedab60969..3f899525a73 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -24,7 +24,6 @@ import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpServer; import org.apache.http.HttpStatus; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.Strings; @@ -32,7 +31,6 @@ import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; @@ -40,23 +38,16 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; +import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import org.threeten.bp.Duration; import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.URLDecoder; import java.security.KeyPairGenerator; import java.util.Arrays; @@ -71,7 +62,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -85,42 +75,10 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BU import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME; @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint") -public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase { +public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { - private static HttpServer httpServer; - private static boolean randomServerErrors; private static byte[] serviceAccount; - @BeforeClass - public static void startHttpServer() throws Exception { - httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); - httpServer.start(); - randomServerErrors = randomBoolean(); - serviceAccount = createServiceAccount(); - } - - @Before - public void setUpHttpServer() { - HttpHandler handler = new InternalHttpHandler(); - if (randomServerErrors) { - handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3)); - } - httpServer.createContext("/", handler); - httpServer.createContext("/token", new FakeOAuth2HttpHandler()); - } - - @AfterClass - public static void stopHttpServer() { - httpServer.stop(0); - httpServer = null; - } - - @After - public void tearDownHttpServer() { - httpServer.removeContext("/"); - httpServer.removeContext("/token"); - } - @Override protected String repositoryType() { return GoogleCloudStorageRepository.TYPE; @@ -140,15 +98,29 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos return Collections.singletonList(TestGoogleCloudStoragePlugin.class); } + @Override + protected Map createHttpHandlers() { + final Map handlers = new HashMap<>(2); + handlers.put("/", new InternalHttpHandler()); + handlers.put("/token", new FakeOAuth2HttpHandler()); + return Collections.unmodifiableMap(handlers); + } + + @Override + protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) { + return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3)); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { + if (serviceAccount == null) { + serviceAccount = createServiceAccount(); + } + final Settings.Builder settings = Settings.builder(); settings.put(super.nodeSettings(nodeOrdinal)); - - final InetSocketAddress address = httpServer.getAddress(); - final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); - settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint); - settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint + "/token"); + settings.put(ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()); + settings.put(TOKEN_URI_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl() + "/token"); final MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(), serviceAccount); @@ -206,47 +178,48 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos @Override protected GoogleCloudStorageService createStorageService() { - if (randomServerErrors) { - return new GoogleCloudStorageService() { - @Override - StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings, - final HttpTransportOptions httpTransportOptions) { - return super.createStorageOptions(clientSettings, httpTransportOptions) - .toBuilder() - .setRetrySettings(RetrySettings.newBuilder() - .setMaxAttempts(10) - .setInitialRetryDelay(Duration.ofMillis(10L)) - .setRetryDelayMultiplier(2.0d) - .setMaxRetryDelay(Duration.ofSeconds(1L)) - .setTotalTimeout(Duration.ofSeconds(30L)) - .build()) - .build(); - } - }; - } - return super.createStorageService(); + return new GoogleCloudStorageService() { + @Override + StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings, + final HttpTransportOptions httpTransportOptions) { + return super.createStorageOptions(clientSettings, httpTransportOptions) + .toBuilder() + .setRetrySettings(RetrySettings.newBuilder() + .setMaxAttempts(10) + .setInitialRetryDelay(Duration.ofMillis(10L)) + .setRetryDelayMultiplier(2.0d) + .setMaxRetryDelay(Duration.ofSeconds(1L)) + .setTotalTimeout(Duration.ofSeconds(30L)) + .build()) + .build(); + } + }; } } - private static byte[] createServiceAccount() throws Exception { - final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); - keyPairGenerator.initialize(1024); - final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded()); + private static byte[] createServiceAccount() { + try { + final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA"); + keyPairGenerator.initialize(1024); + final String privateKey = Base64.getEncoder().encodeToString(keyPairGenerator.generateKeyPair().getPrivate().getEncoded()); - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) { - builder.startObject(); - { - builder.field("type", "service_account"); - builder.field("project_id", getTestClass().getName().toLowerCase(Locale.ROOT)); - builder.field("private_key_id", UUID.randomUUID().toString()); - builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n"); - builder.field("client_email", "elastic@appspot.gserviceaccount.com"); - builder.field("client_id", String.valueOf(randomNonNegativeLong())); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), out)) { + builder.startObject(); + { + builder.field("type", "service_account"); + builder.field("project_id", getTestClass().getName().toLowerCase(Locale.ROOT)); + builder.field("private_key_id", UUID.randomUUID().toString()); + builder.field("private_key", "-----BEGIN PRIVATE KEY-----\n" + privateKey + "\n-----END PRIVATE KEY-----\n"); + builder.field("client_email", "elastic@appspot.gserviceaccount.com"); + builder.field("client_id", String.valueOf(randomNonNegativeLong())); + } + builder.endObject(); } - builder.endObject(); + return out.toByteArray(); + } catch (Exception e) { + throw new AssertionError("Unable to create service account file", e); } - return out.toByteArray(); } /** @@ -435,45 +408,24 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos * slow down the test suite. */ @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint") - private static class ErroneousHttpHandler implements HttpHandler { + private static class GoogleErroneousHttpHandler extends ErroneousHttpHandler { - // first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client, - // value is the number of times the request has been seen - private final Map requests; - private final HttpHandler delegate; - private final int maxErrorsPerRequest; - - private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { - this.requests = new ConcurrentHashMap<>(); - this.delegate = delegate; - this.maxErrorsPerRequest = maxErrorsPerRequest; - assert maxErrorsPerRequest > 1; + GoogleErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { + super(delegate, maxErrorsPerRequest); } @Override - public void handle(final HttpExchange exchange) throws IOException { - final String requestId = exchange.getRemoteAddress().toString() + protected String requestUniqueId(HttpExchange exchange) { + return exchange.getRemoteAddress().toString() + " " + exchange.getRequestMethod() + " " + exchange.getRequestURI(); - assert Strings.hasText(requestId); - - // Batch requests are not retried so we don't want to fail them - // The batched request are supposed to be retried (not tested here) - final boolean noError = exchange.getRequestURI().toString().startsWith("/batch/") || randomBoolean(); - - final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet(); - if (count >= maxErrorsPerRequest || noError) { - requests.remove(requestId); - delegate.handle(exchange); - } else { - handleAsError(exchange); - } } - private void handleAsError(final HttpExchange exchange) throws IOException { - Streams.readFully(exchange.getRequestBody()); - exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1); - exchange.close(); + @Override + protected boolean canFailRequest(final HttpExchange exchange) { + // Batch requests are not retried so we don't want to fail them + // The batched request are supposed to be retried (not tested here) + return exchange.getRequestURI().toString().startsWith("/batch/") == false; } } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index ff35e55fd64..23749bd4d2a 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -19,34 +19,22 @@ package org.elasticsearch.repositories.s3; import com.amazonaws.http.AmazonHttpClient; -import com.amazonaws.services.s3.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; -import com.sun.net.httpserver.HttpServer; -import org.apache.http.HttpStatus; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; +import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; import java.io.IOException; import java.io.InputStreamReader; -import java.net.InetAddress; -import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; @@ -57,41 +45,12 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.Matchers.nullValue; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") -public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase { - - private static HttpServer httpServer; - - @BeforeClass - public static void startHttpServer() throws Exception { - httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); - httpServer.start(); - } - - @Before - public void setUpHttpServer() { - HttpHandler handler = new InternalHttpHandler(); - if (randomBoolean()) { - handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3)); - } - httpServer.createContext("/bucket", handler); - } - - @AfterClass - public static void stopHttpServer() { - httpServer.stop(0); - httpServer = null; - } - - @After - public void tearDownHttpServer() { - httpServer.removeContext("/bucket"); - } +public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @Override protected String repositoryType() { @@ -111,17 +70,24 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa return Collections.singletonList(TestS3RepositoryPlugin.class); } + @Override + protected Map createHttpHandlers() { + return Collections.singletonMap("/bucket", new InternalHttpHandler()); + } + + @Override + protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) { + return new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3)); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { final MockSecureSettings secureSettings = new MockSecureSettings(); secureSettings.setString(S3ClientSettings.ACCESS_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "access"); secureSettings.setString(S3ClientSettings.SECRET_KEY_SETTING.getConcreteSettingForNamespace("test").getKey(), "secret"); - final InetSocketAddress address = httpServer.getAddress(); - final String endpoint = "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); - return Settings.builder() - .put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), endpoint) + .put(S3ClientSettings.ENDPOINT_SETTING.getConcreteSettingForNamespace("test").getKey(), httpServerUrl()) // Disable chunked encoding as it simplifies a lot the request parsing on the httpServer side .put(S3ClientSettings.DISABLE_CHUNKED_ENCODING.getConcreteSettingForNamespace("test").getKey(), true) // Disable request throttling because some random values in tests might generate too many failures for the S3 client @@ -243,45 +209,19 @@ public class S3BlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCa * HTTP handler that injects random S3 service errors * * Note: it is not a good idea to allow this handler to simulate too many errors as it would - * slow down the test suite and/or could trigger SDK client request throttling (and request - * would fail before reaching the max retry attempts - this can be mitigated by disabling - * {@link S3ClientSettings#USE_THROTTLE_RETRIES_SETTING}) + * slow down the test suite. */ @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") - private static class ErroneousHttpHandler implements HttpHandler { + private static class S3ErroneousHttpHandler extends ErroneousHttpHandler { - // first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client, - // value is the number of times the request has been seen - private final Map requests; - private final HttpHandler delegate; - private final int maxErrorsPerRequest; - - private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { - this.requests = new ConcurrentHashMap<>(); - this.delegate = delegate; - this.maxErrorsPerRequest = maxErrorsPerRequest; - assert maxErrorsPerRequest > 1; + S3ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { + super(delegate, maxErrorsPerRequest); } @Override - public void handle(final HttpExchange exchange) throws IOException { - final String requestId = exchange.getRequestHeaders().getFirst(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID); - assert Strings.hasText(requestId); - - final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet(); - if (count >= maxErrorsPerRequest || randomBoolean()) { - requests.remove(requestId); - delegate.handle(exchange); - } else { - handleAsError(exchange, requestId); - } - } - - private void handleAsError(final HttpExchange exchange, final String requestId) throws IOException { - Streams.readFully(exchange.getRequestBody()); - exchange.getResponseHeaders().add(Headers.REQUEST_ID, requestId); - exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1); - exchange.close(); + protected String requestUniqueId(final HttpExchange exchange) { + // Amazon SDK client provides a unique ID per request + return exchange.getRequestHeaders().getFirst(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java new file mode 100644 index 00000000000..68097dd0d71 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -0,0 +1,141 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpStatus; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.mocksocket.MockHttpServer; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services. + */ +@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") +public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase { + + private static HttpServer httpServer; + private Map handlers; + + @BeforeClass + public static void startHttpServer() throws Exception { + httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0); + httpServer.start(); + } + + @Before + public void setUpHttpServer() { + handlers = createHttpHandlers(); + handlers.forEach((c, h) -> { + HttpHandler handler = h; + if (randomBoolean()) { + handler = createErroneousHttpHandler(handler); + } + httpServer.createContext(c, handler); + }); + } + + @AfterClass + public static void stopHttpServer() { + httpServer.stop(0); + httpServer = null; + } + + @After + public void tearDownHttpServer() { + if (handlers != null) { + handlers.keySet().forEach(context -> httpServer.removeContext(context)); + } + } + + protected abstract Map createHttpHandlers(); + + protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate); + + protected static String httpServerUrl() { + InetSocketAddress address = httpServer.getAddress(); + return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); + } + + /** + * HTTP handler that injects random service errors + * + * Note: it is not a good idea to allow this handler to simulate too many errors as it would + * slow down the test suite. + */ + @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") + protected abstract static class ErroneousHttpHandler implements HttpHandler { + + // first key is a unique identifier for the incoming HTTP request, + // value is the number of times the request has been seen + private final Map requests; + private final HttpHandler delegate; + private final int maxErrorsPerRequest; + + @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") + protected ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { + this.requests = new ConcurrentHashMap<>(); + this.delegate = delegate; + this.maxErrorsPerRequest = maxErrorsPerRequest; + assert maxErrorsPerRequest > 1; + } + + @Override + public void handle(final HttpExchange exchange) throws IOException { + final String requestId = requestUniqueId(exchange); + assert Strings.hasText(requestId); + + final boolean canFailRequest = canFailRequest(exchange); + final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet(); + if (count >= maxErrorsPerRequest || canFailRequest == false || randomBoolean()) { + requests.remove(requestId); + delegate.handle(exchange); + } else { + handleAsError(exchange); + } + } + + private void handleAsError(final HttpExchange exchange) throws IOException { + Streams.readFully(exchange.getRequestBody()); + exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1); + exchange.close(); + } + + protected abstract String requestUniqueId(HttpExchange exchange); + + protected boolean canFailRequest(final HttpExchange exchange) { + return true; + } + } +}