Track GET/LIST Azure Storage API calls (#56937)
Adds tracking for the API calls performed by the Azure Storage underlying SDK. It relies on the ability to hook a request listener into the OperationContext. Backport of #56773
This commit is contained in:
parent
644ae49817
commit
9e870ec3af
|
@ -63,7 +63,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
logger.trace("blobExists({})", blobName);
|
||||
try {
|
||||
return blobStore.blobExists(buildKey(blobName));
|
||||
} catch (URISyntaxException | StorageException | IOException e) {
|
||||
} catch (URISyntaxException | StorageException e) {
|
||||
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage());
|
||||
}
|
||||
return false;
|
||||
|
|
|
@ -22,7 +22,9 @@ package org.elasticsearch.repositories.azure;
|
|||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.LocationMode;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.RequestCompletedEvent;
|
||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||
import com.microsoft.azure.storage.StorageEvent;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
|
@ -68,6 +70,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -85,6 +88,11 @@ public class AzureBlobStore implements BlobStore {
|
|||
private final String container;
|
||||
private final LocationMode locationMode;
|
||||
|
||||
private final Stats stats = new Stats();
|
||||
|
||||
private final Consumer<String> getMetricsCollector;
|
||||
private final Consumer<String> listMetricsCollector;
|
||||
|
||||
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, ThreadPool threadPool) {
|
||||
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
|
||||
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
|
||||
|
@ -95,6 +103,15 @@ public class AzureBlobStore implements BlobStore {
|
|||
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
|
||||
final Map<String, AzureStorageSettings> newSettings = AzureStorageSettings.overrideLocationMode(prevSettings, this.locationMode);
|
||||
this.service.refreshAndClearCache(newSettings);
|
||||
this.getMetricsCollector = (requestMethod) -> {
|
||||
if (requestMethod.equalsIgnoreCase("HEAD")) {
|
||||
stats.headOperations.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
|
||||
stats.getOperations.incrementAndGet();
|
||||
};
|
||||
this.listMetricsCollector = (requestMethod) -> stats.listOperations.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -122,7 +139,7 @@ public class AzureBlobStore implements BlobStore {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException {
|
||||
public boolean blobExists(String blob) throws URISyntaxException, StorageException {
|
||||
// Container name must be lower case.
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
|
@ -132,7 +149,7 @@ public class AzureBlobStore implements BlobStore {
|
|||
});
|
||||
}
|
||||
|
||||
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
|
||||
public void deleteBlob(String blob) throws URISyntaxException, StorageException {
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
// Container name must be lower case.
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
|
@ -147,6 +164,7 @@ public class AzureBlobStore implements BlobStore {
|
|||
public DeleteResult deleteBlobDirectory(String path, Executor executor)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
|
||||
final AtomicLong outstanding = new AtomicLong(1L);
|
||||
|
@ -154,7 +172,8 @@ public class AzureBlobStore implements BlobStore {
|
|||
final AtomicLong blobsDeleted = new AtomicLong();
|
||||
final AtomicLong bytesDeleted = new AtomicLong();
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
|
||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true,
|
||||
EnumSet.noneOf(BlobListingDetails.class), null, context)) {
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
|
||||
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
|
||||
|
@ -203,26 +222,28 @@ public class AzureBlobStore implements BlobStore {
|
|||
|
||||
public InputStream getInputStream(String blob, long position, @Nullable Long length) throws URISyntaxException, StorageException {
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector);
|
||||
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
|
||||
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
|
||||
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
|
||||
blockBlobReference.openInputStream(position, length, null, null, client.v2().get()));
|
||||
blockBlobReference.openInputStream(position, length, null, null, context));
|
||||
return giveSocketPermissionsToStream(is);
|
||||
}
|
||||
|
||||
public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix)
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
throws URISyntaxException, StorageException {
|
||||
// NOTE: this should be here: if (prefix == null) prefix = "";
|
||||
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
|
||||
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
|
||||
final Map<String, BlobMetadata> blobsBuilder = new HashMap<String, BlobMetadata>();
|
||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
logger.trace(() -> new ParameterizedMessage("listing container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix));
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (final ListBlobItem blobItem : blobContainer.listBlobs(keyPath + (prefix == null ? "" : prefix), false,
|
||||
enumBlobListingDetails, null, client.v2().get())) {
|
||||
enumBlobListingDetails, null, context)) {
|
||||
final URI uri = blobItem.getUri();
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
|
||||
|
@ -239,15 +260,16 @@ public class AzureBlobStore implements BlobStore {
|
|||
return MapBuilder.newMapBuilder(blobsBuilder).immutableMap();
|
||||
}
|
||||
|
||||
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
|
||||
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
|
||||
final Set<String> blobsBuilder = new HashSet<String>();
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final OperationContext context = hookMetricCollector(client.v2().get(), listMetricsCollector);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final String keyPath = path.buildAsString();
|
||||
final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA);
|
||||
|
||||
SocketAccess.doPrivilegedVoidException(() -> {
|
||||
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) {
|
||||
for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, context)) {
|
||||
if (blobItem instanceof CloudBlobDirectory) {
|
||||
final URI uri = blobItem.getUri();
|
||||
logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri));
|
||||
|
@ -291,6 +313,20 @@ public class AzureBlobStore implements BlobStore {
|
|||
return service.client(clientName);
|
||||
}
|
||||
|
||||
private OperationContext hookMetricCollector(OperationContext context, Consumer<String> metricCollector) {
|
||||
context.getRequestCompletedEventHandler().addListener(new StorageEvent<RequestCompletedEvent>() {
|
||||
@Override
|
||||
public void eventOccurred(RequestCompletedEvent eventArg) {
|
||||
int statusCode = eventArg.getRequestResult().getStatusCode();
|
||||
HttpURLConnection httpURLConnection = (HttpURLConnection) eventArg.getConnectionObject();
|
||||
if (statusCode < 300) {
|
||||
metricCollector.accept(httpURLConnection.getRequestMethod());
|
||||
}
|
||||
}
|
||||
});
|
||||
return context;
|
||||
}
|
||||
|
||||
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
|
||||
return new InputStream() {
|
||||
@Override
|
||||
|
@ -309,4 +345,26 @@ public class AzureBlobStore implements BlobStore {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Long> stats() {
|
||||
return stats.toMap();
|
||||
}
|
||||
|
||||
private static class Stats {
|
||||
|
||||
private final AtomicLong getOperations = new AtomicLong();
|
||||
|
||||
private final AtomicLong listOperations = new AtomicLong();
|
||||
|
||||
private final AtomicLong headOperations = new AtomicLong();
|
||||
|
||||
private Map<String, Long> toMap() {
|
||||
return org.elasticsearch.common.collect.Map.of(
|
||||
"GET", getOperations.get(),
|
||||
"LIST", listOperations.get(),
|
||||
"HEAD", headOperations.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,10 +22,12 @@ import com.microsoft.azure.storage.Constants;
|
|||
import com.microsoft.azure.storage.RetryExponentialRetry;
|
||||
import com.microsoft.azure.storage.RetryPolicyFactory;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.sun.net.httpserver.Headers;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import fixture.azure.AzureHttpHandler;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
|
@ -40,6 +42,7 @@ import java.util.Collection;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
|
||||
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
|
||||
|
@ -65,7 +68,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
|
||||
@Override
|
||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||
return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
|
||||
return Collections.singletonMap("/container", new AzureHTTPStatsCollectorHandler(new AzureBlobStoreHttpHandler("container")));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -75,7 +78,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
|
||||
@Override
|
||||
protected List<String> requestTypesTracked() {
|
||||
return org.elasticsearch.common.collect.List.of();
|
||||
return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -161,4 +164,28 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
+ (range != null ? " " + range : "");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HTTP handler that keeps track of requests performed against Azure Storage.
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
|
||||
private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler {
|
||||
|
||||
private static final Pattern listPattern = Pattern.compile("GET /[a-zA-Z0-9]+\\??.+");
|
||||
|
||||
private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
|
||||
super(delegate);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void maybeTrack(String request, Headers headers) {
|
||||
if (Regex.simpleMatch("GET /*/*", request)) {
|
||||
trackRequest("GET");
|
||||
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
|
||||
trackRequest("HEAD");
|
||||
} else if (listPattern.matcher(request).matches()) {
|
||||
trackRequest("LIST");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue