diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java index a9b236a48a0..8815e738f9c 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureRepositoryPlugin.java @@ -50,7 +50,12 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R public AzureRepositoryPlugin(Settings settings) { // eagerly load client settings so that secure settings are read - this.azureStoreService = new AzureStorageService(settings); + this.azureStoreService = createAzureStoreService(settings); + } + + // non-static, package private for testing + AzureStorageService createAzureStoreService(final Settings settings) { + return new AzureStorageService(settings); } @Override diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index ef34c533501..c9f820ff09f 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -24,6 +24,7 @@ import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicy; +import com.microsoft.azure.storage.RetryPolicyFactory; import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobInputStream; @@ -111,7 +112,7 @@ public class AzureStorageService { } } - private static CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException { + private CloudBlobClient buildClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException { final CloudBlobClient client = createClient(azureStorageSettings); // Set timeout option if the user sets cloud.azure.storage.timeout or // cloud.azure.storage.xxx.timeout (it's negative by default) @@ -123,12 +124,16 @@ public class AzureStorageService { client.getDefaultRequestOptions().setTimeoutIntervalInMs((int) timeout); } // We define a default exponential retry policy - client.getDefaultRequestOptions() - .setRetryPolicyFactory(new RetryExponentialRetry(RetryPolicy.DEFAULT_CLIENT_BACKOFF, azureStorageSettings.getMaxRetries())); + client.getDefaultRequestOptions().setRetryPolicyFactory(createRetryPolicy(azureStorageSettings)); client.getDefaultRequestOptions().setLocationMode(azureStorageSettings.getLocationMode()); return client; } + // non-static, package private for testing + RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) { + return new RetryExponentialRetry(RetryPolicy.DEFAULT_CLIENT_BACKOFF, azureStorageSettings.getMaxRetries()); + } + private static CloudBlobClient createClient(AzureStorageSettings azureStorageSettings) throws InvalidKeyException, URISyntaxException { final String connectionString = azureStorageSettings.getConnectString(); return CloudStorageAccount.parse(connectionString).createCloudBlobClient(); diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 6e3de990151..cecd8cba1f2 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -18,9 +18,14 @@ */ package org.elasticsearch.repositories.azure; +import com.microsoft.azure.storage.Constants; +import com.microsoft.azure.storage.RetryExponentialRetry; +import com.microsoft.azure.storage.RetryPolicyFactory; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpStatus; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; @@ -48,6 +53,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTestCase { @@ -62,7 +68,11 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes @Before public void setUpHttpServer() { - httpServer.createContext("/container", new InternalHttpHandler()); + HttpHandler handler = new InternalHttpHandler(); + if (randomBoolean()) { + handler = new ErroneousHttpHandler(handler, randomIntBetween(2, 3)); + } + httpServer.createContext("/container", handler); } @AfterClass @@ -91,7 +101,7 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes @Override protected Collection> nodePlugins() { - return Collections.singletonList(AzureRepositoryPlugin.class); + return Collections.singletonList(TestAzureRepositoryPlugin.class); } @Override @@ -112,6 +122,26 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes .build(); } + /** + * AzureRepositoryPlugin that allows to set very low values for the Azure's client retry policy + */ + public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin { + + public TestAzureRepositoryPlugin(Settings settings) { + super(settings); + } + + @Override + AzureStorageService createAzureStoreService(final Settings settings) { + return new AzureStorageService(settings) { + @Override + RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) { + return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries()); + } + }; + } + } + /** * Minimal HTTP handler that acts as an Azure compliant server */ @@ -187,4 +217,49 @@ public class AzureBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTes } } } + + + /** + * HTTP handler that injects random Azure service errors + * + * Note: it is not a good idea to allow this handler to simulate too many errors as it would + * slow down the test suite. + */ + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") + private static class ErroneousHttpHandler implements HttpHandler { + + // first key is the remote address, second key is the HTTP request unique id provided by the SDK client, + // value is the number of times the request has been seen + private final Map requests; + private final HttpHandler delegate; + private final int maxErrorsPerRequest; + + private ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPerRequest) { + this.requests = new ConcurrentHashMap<>(); + this.delegate = delegate; + this.maxErrorsPerRequest = maxErrorsPerRequest; + assert maxErrorsPerRequest > 1; + } + + @Override + public void handle(final HttpExchange exchange) throws IOException { + final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); + assert Strings.hasText(requestId); + + final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet(); + if (count >= maxErrorsPerRequest || randomBoolean()) { + requests.remove(requestId); + delegate.handle(exchange); + } else { + handleAsError(exchange, requestId); + } + } + + private void handleAsError(final HttpExchange exchange, final String requestId) throws IOException { + Streams.readFully(exchange.getRequestBody()); + exchange.getResponseHeaders().add(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER, requestId); + exchange.sendResponseHeaders(HttpStatus.SC_INTERNAL_SERVER_ERROR, -1); + exchange.close(); + } + } }