diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index de803dd8ada..7443c670bca 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -63,7 +63,7 @@ public class AzureBlobContainer extends AbstractBlobContainer { logger.trace("blobExists({})", blobName); try { return blobStore.blobExists(buildKey(blobName)); - } catch (URISyntaxException | StorageException | IOException e) { + } catch (URISyntaxException | StorageException e) { logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage()); } return false; diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 26a45c24f18..677ebe77ce3 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -22,7 +22,9 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.AccessCondition; import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.RequestCompletedEvent; import com.microsoft.azure.storage.StorageErrorCodeStrings; +import com.microsoft.azure.storage.StorageEvent; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; @@ -68,6 +70,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -85,6 +88,11 @@ public class AzureBlobStore implements BlobStore { private final String container; private final LocationMode locationMode; + private final Stats stats = new Stats(); + + private final Consumer getMetricsCollector; + private final Consumer listMetricsCollector; + public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, ThreadPool threadPool) { this.container = Repository.CONTAINER_SETTING.get(metadata.settings()); this.clientName = Repository.CLIENT_NAME.get(metadata.settings()); @@ -95,6 +103,15 @@ public class AzureBlobStore implements BlobStore { final Map prevSettings = this.service.refreshAndClearCache(emptyMap()); final Map newSettings = AzureStorageSettings.overrideLocationMode(prevSettings, this.locationMode); this.service.refreshAndClearCache(newSettings); + this.getMetricsCollector = (requestMethod) -> { + if (requestMethod.equalsIgnoreCase("HEAD")) { + stats.headOperations.incrementAndGet(); + return; + } + + stats.getOperations.incrementAndGet(); + }; + this.listMetricsCollector = (requestMethod) -> stats.listOperations.incrementAndGet(); } @Override @@ -122,7 +139,7 @@ public class AzureBlobStore implements BlobStore { public void close() { } - public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException { + public boolean blobExists(String blob) throws URISyntaxException, StorageException { // Container name must be lower case. final Tuple> client = client(); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -132,7 +149,7 @@ public class AzureBlobStore implements BlobStore { }); } - public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { + public void deleteBlob(String blob) throws URISyntaxException, StorageException { final Tuple> client = client(); // Container name must be lower case. final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -147,6 +164,7 @@ public class AzureBlobStore implements BlobStore { public DeleteResult deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException { final Tuple> client = client(); + final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final Collection exceptions = Collections.synchronizedList(new ArrayList<>()); final AtomicLong outstanding = new AtomicLong(1L); @@ -154,7 +172,8 @@ public class AzureBlobStore implements BlobStore { final AtomicLong blobsDeleted = new AtomicLong(); final AtomicLong bytesDeleted = new AtomicLong(); SocketAccess.doPrivilegedVoidException(() -> { - for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { + for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true, + EnumSet.noneOf(BlobListingDetails.class), null, context)) { // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ // this requires 1 + container.length() + 1, with each 1 corresponding to one of the / final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1); @@ -203,26 +222,28 @@ public class AzureBlobStore implements BlobStore { public InputStream getInputStream(String blob, long position, @Nullable Long length) throws URISyntaxException, StorageException { final Tuple> client = client(); + final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector); final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob); logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob)); final BlobInputStream is = SocketAccess.doPrivilegedException(() -> - blockBlobReference.openInputStream(position, length, null, null, client.v2().get())); + blockBlobReference.openInputStream(position, length, null, null, context)); return giveSocketPermissionsToStream(is); } public Map listBlobsByPrefix(String keyPath, String prefix) - throws URISyntaxException, StorageException, IOException { + throws URISyntaxException, StorageException { // NOTE: this should be here: if (prefix == null) prefix = ""; // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix! final Map blobsBuilder = new HashMap(); final EnumSet enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA); final Tuple> client = client(); + final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix)); SocketAccess.doPrivilegedVoidException(() -> { for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false, - enumBlobListingDetails, null, client.v2().get())) { + enumBlobListingDetails, null, context)) { final URI uri = blobItem.getUri(); logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri)); // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ @@ -239,15 +260,16 @@ public class AzureBlobStore implements BlobStore { return MapBuilder.newMapBuilder(blobsBuilder).immutableMap(); } - public Map children(BlobPath path) throws URISyntaxException, StorageException, IOException { + public Map children(BlobPath path) throws URISyntaxException, StorageException { final Set blobsBuilder = new HashSet(); final Tuple> client = client(); + final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); final String keyPath = path.buildAsString(); final EnumSet enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA); SocketAccess.doPrivilegedVoidException(() -> { - for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) { + for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, context)) { if (blobItem instanceof CloudBlobDirectory) { final URI uri = blobItem.getUri(); logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri)); @@ -291,6 +313,20 @@ public class AzureBlobStore implements BlobStore { return service.client(clientName); } + private OperationContext hookMetricCollector(OperationContext context, Consumer metricCollector) { + context.getRequestCompletedEventHandler().addListener(new StorageEvent() { + @Override + public void eventOccurred(RequestCompletedEvent eventArg) { + int statusCode = eventArg.getRequestResult().getStatusCode(); + HttpURLConnection httpURLConnection = (HttpURLConnection) eventArg.getConnectionObject(); + if (statusCode < 300) { + metricCollector.accept(httpURLConnection.getRequestMethod()); + } + } + }); + return context; + } + static InputStream giveSocketPermissionsToStream(final InputStream stream) { return new InputStream() { @Override @@ -309,4 +345,26 @@ public class AzureBlobStore implements BlobStore { } }; } + + @Override + public Map stats() { + return stats.toMap(); + } + + private static class Stats { + + private final AtomicLong getOperations = new AtomicLong(); + + private final AtomicLong listOperations = new AtomicLong(); + + private final AtomicLong headOperations = new AtomicLong(); + + private Map toMap() { + return org.elasticsearch.common.collect.Map.of( + "GET", getOperations.get(), + "LIST", listOperations.get(), + "HEAD", headOperations.get() + ); + } + } } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index b8e8bc6fac5..69b3e6630c8 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -22,10 +22,12 @@ import com.microsoft.azure.storage.Constants; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicyFactory; import com.microsoft.azure.storage.blob.BlobRequestOptions; +import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.azure.AzureHttpHandler; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -40,6 +42,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @@ -65,7 +68,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container")); + return Collections.singletonMap("/container", new AzureHTTPStatsCollectorHandler(new AzureBlobStoreHttpHandler("container"))); } @Override @@ -75,7 +78,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg @Override protected List requestTypesTracked() { - return org.elasticsearch.common.collect.List.of(); + return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD"); } @Override @@ -161,4 +164,28 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg + (range != null ? " " + range : ""); } } + + /** + * HTTP handler that keeps track of requests performed against Azure Storage. + */ + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") + private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler { + + private static final Pattern listPattern = Pattern.compile("GET /[a-zA-Z0-9]+\\??.+"); + + private AzureHTTPStatsCollectorHandler(HttpHandler delegate) { + super(delegate); + } + + @Override + protected void maybeTrack(String request, Headers headers) { + if (Regex.simpleMatch("GET /*/*", request)) { + trackRequest("GET"); + } else if (Regex.simpleMatch("HEAD /*/*", request)) { + trackRequest("HEAD"); + } else if (listPattern.matcher(request).matches()) { + trackRequest("LIST"); + } + } + } }