Track PUT/PUT_BLOCK operations on AzureBlobStore. (#57121)
Backport of #56936
This commit is contained in:
parent
c5f61fe24c
commit
42a15c9b80
|
@ -90,8 +90,9 @@ public class AzureBlobStore implements BlobStore {
|
|||
|
||||
private final Stats stats = new Stats();
|
||||
|
||||
private final Consumer<String> getMetricsCollector;
|
||||
private final Consumer<String> listMetricsCollector;
|
||||
private final Consumer<HttpURLConnection> getMetricsCollector;
|
||||
private final Consumer<HttpURLConnection> listMetricsCollector;
|
||||
private final Consumer<HttpURLConnection> uploadMetricsCollector;
|
||||
|
||||
public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, ThreadPool threadPool) {
|
||||
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
|
||||
|
@ -103,15 +104,35 @@ public class AzureBlobStore implements BlobStore {
|
|||
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
|
||||
final Map<String, AzureStorageSettings> newSettings = AzureStorageSettings.overrideLocationMode(prevSettings, this.locationMode);
|
||||
this.service.refreshAndClearCache(newSettings);
|
||||
this.getMetricsCollector = (requestMethod) -> {
|
||||
if (requestMethod.equalsIgnoreCase("HEAD")) {
|
||||
this.getMetricsCollector = (httpURLConnection) -> {
|
||||
if (httpURLConnection.getRequestMethod().equals("HEAD")) {
|
||||
stats.headOperations.incrementAndGet();
|
||||
return;
|
||||
}
|
||||
assert httpURLConnection.getRequestMethod().equals("GET");
|
||||
|
||||
stats.getOperations.incrementAndGet();
|
||||
};
|
||||
this.listMetricsCollector = (requestMethod) -> stats.listOperations.incrementAndGet();
|
||||
this.listMetricsCollector = (httpURLConnection) -> {
|
||||
assert httpURLConnection.getRequestMethod().equals("GET");
|
||||
stats.listOperations.incrementAndGet();
|
||||
};
|
||||
this.uploadMetricsCollector = (httpURLConnection -> {
|
||||
assert httpURLConnection.getRequestMethod().equals("PUT");
|
||||
String queryParams = httpURLConnection.getURL().getQuery();
|
||||
if (queryParams != null && isBlockUpload(queryParams)) {
|
||||
stats.putBlockOperations.incrementAndGet();
|
||||
} else {
|
||||
stats.putOperations.incrementAndGet();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private boolean isBlockUpload(String queryParams) {
|
||||
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
|
||||
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
|
||||
return (queryParams.contains("comp=block") && queryParams.contains("blockid="))
|
||||
|| queryParams.contains("comp=blocklist");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -292,13 +313,14 @@ public class AzureBlobStore implements BlobStore {
|
|||
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
|
||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client();
|
||||
final OperationContext operationContext = hookMetricCollector(client().v2().get(), uploadMetricsCollector);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
final CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
|
||||
try {
|
||||
final AccessCondition accessCondition =
|
||||
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
|
||||
SocketAccess.doPrivilegedVoidException(() ->
|
||||
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), client.v2().get()));
|
||||
blob.upload(inputStream, blobSize, accessCondition, service.getBlobRequestOptionsForWriteBlob(), operationContext));
|
||||
} catch (final StorageException se) {
|
||||
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
|
||||
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
|
||||
|
@ -313,14 +335,13 @@ public class AzureBlobStore implements BlobStore {
|
|||
return service.client(clientName);
|
||||
}
|
||||
|
||||
private OperationContext hookMetricCollector(OperationContext context, Consumer<String> metricCollector) {
|
||||
private OperationContext hookMetricCollector(OperationContext context, Consumer<HttpURLConnection> metricCollector) {
|
||||
context.getRequestCompletedEventHandler().addListener(new StorageEvent<RequestCompletedEvent>() {
|
||||
@Override
|
||||
public void eventOccurred(RequestCompletedEvent eventArg) {
|
||||
int statusCode = eventArg.getRequestResult().getStatusCode();
|
||||
HttpURLConnection httpURLConnection = (HttpURLConnection) eventArg.getConnectionObject();
|
||||
if (statusCode < 300) {
|
||||
metricCollector.accept(httpURLConnection.getRequestMethod());
|
||||
metricCollector.accept((HttpURLConnection) eventArg.getConnectionObject());
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -359,11 +380,17 @@ public class AzureBlobStore implements BlobStore {
|
|||
|
||||
private final AtomicLong headOperations = new AtomicLong();
|
||||
|
||||
private final AtomicLong putOperations = new AtomicLong();
|
||||
|
||||
private final AtomicLong putBlockOperations = new AtomicLong();
|
||||
|
||||
private Map<String, Long> toMap() {
|
||||
return org.elasticsearch.common.collect.Map.of(
|
||||
"GET", getOperations.get(),
|
||||
"LIST", listOperations.get(),
|
||||
"HEAD", headOperations.get()
|
||||
"HEAD", headOperations.get(),
|
||||
"PUT", putOperations.get(),
|
||||
"PUT_BLOCK", putBlockOperations.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
|
||||
@Override
|
||||
protected List<String> requestTypesTracked() {
|
||||
return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD");
|
||||
return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD", "PUT", "PUT_BLOCK");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -185,7 +185,18 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
trackRequest("HEAD");
|
||||
} else if (listPattern.matcher(request).matches()) {
|
||||
trackRequest("LIST");
|
||||
} else if (isBlockUpload(request)) {
|
||||
trackRequest("PUT_BLOCK");
|
||||
} else if (Regex.simpleMatch("PUT /*/*", request)) {
|
||||
trackRequest("PUT");
|
||||
}
|
||||
}
|
||||
|
||||
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
|
||||
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
|
||||
private boolean isBlockUpload(String request) {
|
||||
return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request)
|
||||
|| (Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid="));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue