Inject random server errors in GoogleCloudStorageBlobStoreRepositoryTests (#46376)
This commit modifies the HTTP server used in GoogleCloudStorageBlobStoreRepositoryTests so that it randomly returns server errors. The test does not inject server errors for the following types of request: batch request, resumable upload request.
This commit is contained in:
parent
cc092b1be1
commit
8e3dc68454
|
@ -100,7 +100,7 @@ public class GoogleCloudStorageService {
|
||||||
* @return a new client storage instance that can be used to manage objects
|
* @return a new client storage instance that can be used to manage objects
|
||||||
* (blobs)
|
* (blobs)
|
||||||
*/
|
*/
|
||||||
private static Storage createClient(String clientName, GoogleCloudStorageClientSettings clientSettings) throws IOException {
|
private Storage createClient(String clientName, GoogleCloudStorageClientSettings clientSettings) throws IOException {
|
||||||
logger.debug(() -> new ParameterizedMessage("creating GCS client with client_name [{}], endpoint [{}]", clientName,
|
logger.debug(() -> new ParameterizedMessage("creating GCS client with client_name [{}], endpoint [{}]", clientName,
|
||||||
clientSettings.getHost()));
|
clientSettings.getHost()));
|
||||||
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> {
|
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> {
|
||||||
|
@ -111,10 +111,16 @@ public class GoogleCloudStorageService {
|
||||||
return builder.build();
|
return builder.build();
|
||||||
});
|
});
|
||||||
final HttpTransportOptions httpTransportOptions = HttpTransportOptions.newBuilder()
|
final HttpTransportOptions httpTransportOptions = HttpTransportOptions.newBuilder()
|
||||||
.setConnectTimeout(toTimeout(clientSettings.getConnectTimeout()))
|
.setConnectTimeout(toTimeout(clientSettings.getConnectTimeout()))
|
||||||
.setReadTimeout(toTimeout(clientSettings.getReadTimeout()))
|
.setReadTimeout(toTimeout(clientSettings.getReadTimeout()))
|
||||||
.setHttpTransportFactory(() -> httpTransport)
|
.setHttpTransportFactory(() -> httpTransport)
|
||||||
.build();
|
.build();
|
||||||
|
final StorageOptions storageOptions = createStorageOptions(clientSettings, httpTransportOptions);
|
||||||
|
return storageOptions.getService();
|
||||||
|
}
|
||||||
|
|
||||||
|
StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
|
||||||
|
final HttpTransportOptions httpTransportOptions) {
|
||||||
final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
|
final StorageOptions.Builder storageOptionsBuilder = StorageOptions.newBuilder()
|
||||||
.setTransportOptions(httpTransportOptions)
|
.setTransportOptions(httpTransportOptions)
|
||||||
.setHeaderProvider(() -> {
|
.setHeaderProvider(() -> {
|
||||||
|
@ -144,7 +150,7 @@ public class GoogleCloudStorageService {
|
||||||
}
|
}
|
||||||
storageOptionsBuilder.setCredentials(serviceAccountCredentials);
|
storageOptionsBuilder.setCredentials(serviceAccountCredentials);
|
||||||
}
|
}
|
||||||
return storageOptionsBuilder.build().getService();
|
return storageOptionsBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,6 +19,9 @@
|
||||||
|
|
||||||
package org.elasticsearch.repositories.gcs;
|
package org.elasticsearch.repositories.gcs;
|
||||||
|
|
||||||
|
import com.google.api.gax.retrying.RetrySettings;
|
||||||
|
import com.google.cloud.http.HttpTransportOptions;
|
||||||
|
import com.google.cloud.storage.StorageOptions;
|
||||||
import com.sun.net.httpserver.HttpExchange;
|
import com.sun.net.httpserver.HttpExchange;
|
||||||
import com.sun.net.httpserver.HttpHandler;
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
import com.sun.net.httpserver.HttpServer;
|
import com.sun.net.httpserver.HttpServer;
|
||||||
|
@ -46,6 +49,7 @@ import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.threeten.bp.Duration;
|
||||||
|
|
||||||
import java.io.BufferedInputStream;
|
import java.io.BufferedInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -67,6 +71,7 @@ import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -83,18 +88,24 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CL
|
||||||
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase {
|
||||||
|
|
||||||
private static HttpServer httpServer;
|
private static HttpServer httpServer;
|
||||||
|
private static boolean randomServerErrors;
|
||||||
private static byte[] serviceAccount;
|
private static byte[] serviceAccount;
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void startHttpServer() throws Exception {
|
public static void startHttpServer() throws Exception {
|
||||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||||
httpServer.start();
|
httpServer.start();
|
||||||
|
randomServerErrors = randomBoolean();
|
||||||
serviceAccount = createServiceAccount();
|
serviceAccount = createServiceAccount();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUpHttpServer() {
|
public void setUpHttpServer() {
|
||||||
httpServer.createContext("/", new InternalHttpHandler());
|
HttpHandler handler = new InternalHttpHandler();
|
||||||
|
if (randomServerErrors) {
|
||||||
|
handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3));
|
||||||
|
}
|
||||||
|
httpServer.createContext("/", handler);
|
||||||
httpServer.createContext("/token", new FakeOAuth2HttpHandler());
|
httpServer.createContext("/token", new FakeOAuth2HttpHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,7 +137,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
return Collections.singletonList(GoogleCloudStoragePlugin.class);
|
return Collections.singletonList(TestGoogleCloudStoragePlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -184,6 +195,39 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
||||||
assertEquals("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]", e.getMessage());
|
assertEquals("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* GoogleCloudStoragePlugin that allows to set low values for the client retry policy
|
||||||
|
*/
|
||||||
|
public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
|
||||||
|
|
||||||
|
public TestGoogleCloudStoragePlugin(Settings settings) {
|
||||||
|
super(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GoogleCloudStorageService createStorageService() {
|
||||||
|
if (randomServerErrors) {
|
||||||
|
return new GoogleCloudStorageService() {
|
||||||
|
@Override
|
||||||
|
StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
|
||||||
|
final HttpTransportOptions httpTransportOptions) {
|
||||||
|
return super.createStorageOptions(clientSettings, httpTransportOptions)
|
||||||
|
.toBuilder()
|
||||||
|
.setRetrySettings(RetrySettings.newBuilder()
|
||||||
|
.setMaxAttempts(10)
|
||||||
|
.setInitialRetryDelay(Duration.ofMillis(10L))
|
||||||
|
.setRetryDelayMultiplier(2.0d)
|
||||||
|
.setMaxRetryDelay(Duration.ofSeconds(1L))
|
||||||
|
.setTotalTimeout(Duration.ofSeconds(30L))
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return super.createStorageService();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static byte[] createServiceAccount() throws Exception {
|
private static byte[] createServiceAccount() throws Exception {
|
||||||
final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
|
final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
|
||||||
keyPairGenerator.initialize(1024);
|
keyPairGenerator.initialize(1024);
|
||||||
|
@ -383,4 +427,53 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESBlobStoreRepos
|
||||||
exchange.close();
|
exchange.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HTTP handler that injects random Google Cloud Storage service errors
|
||||||
|
*
|
||||||
|
* Note: it is not a good idea to allow this handler to simulate too many errors as it would
|
||||||
|
* slow down the test suite.
|
||||||
|
*/
|
||||||
|
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||||
|
private static class ErroneousHttpHandler implements HttpHandler {
|
||||||
|
|
||||||
|
// first key is the remote address, second key is the HTTP request unique id provided by the AWS SDK client,
|
||||||
|
// value is the number of times the request has been seen
|
||||||
|
private final Map<String, AtomicInteger> requests;
|
||||||
|
private final HttpHandler delegate;
|
||||||
|
private final int maxErrorsPerRequest;
|
||||||
|
|
||||||
|
private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) {
|
||||||
|
this.requests = new ConcurrentHashMap<>();
|
||||||
|
this.delegate = delegate;
|
||||||
|
this.maxErrorsPerRequest = maxErrorsPerRequest;
|
||||||
|
assert maxErrorsPerRequest > 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handle(final HttpExchange exchange) throws IOException {
|
||||||
|
final String requestId = exchange.getRemoteAddress().toString()
|
||||||
|
+ " " + exchange.getRequestMethod()
|
||||||
|
+ " " + exchange.getRequestURI();
|
||||||
|
assert Strings.hasText(requestId);
|
||||||
|
|
||||||
|
// Batch requests are not retried so we don't want to fail them
|
||||||
|
// The batched request are supposed to be retried (not tested here)
|
||||||
|
final boolean noError = exchange.getRequestURI().toString().startsWith("/batch/") || randomBoolean();
|
||||||
|
|
||||||
|
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
|
||||||
|
if (count >= maxErrorsPerRequest || noError) {
|
||||||
|
requests.remove(requestId);
|
||||||
|
delegate.handle(exchange);
|
||||||
|
} else {
|
||||||
|
handleAsError(exchange);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void handleAsError(final HttpExchange exchange) throws IOException {
|
||||||
|
Streams.readFully(exchange.getRequestBody());
|
||||||
|
exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1);
|
||||||
|
exchange.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue