Track multipart/resumable uploads GCS API calls (#56892)
Add tracking for multipart and resumable uploads for GoogleCloudStorage. For resumable uploads only the last request is taken into account for billing, so that's the only request that's tracked. Backport of #56821
This commit is contained in:
parent
52a329fa12
commit
8ab9fc10c1
|
@ -293,6 +293,11 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
}));
|
}));
|
||||||
|
// We don't track this operation on the http layer as
|
||||||
|
// we do with the GET/LIST operations since this operations
|
||||||
|
// can trigger multiple underlying http requests but only one
|
||||||
|
// operation is billed.
|
||||||
|
stats.trackPutOperation();
|
||||||
return;
|
return;
|
||||||
} catch (final StorageException se) {
|
} catch (final StorageException se) {
|
||||||
final int errorCode = se.getCode();
|
final int errorCode = se.getCode();
|
||||||
|
@ -335,6 +340,11 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
new Storage.BlobTargetOption[0];
|
new Storage.BlobTargetOption[0];
|
||||||
SocketAccess.doPrivilegedVoidIOException(
|
SocketAccess.doPrivilegedVoidIOException(
|
||||||
() -> client().create(blobInfo, buffer, targetOptions));
|
() -> client().create(blobInfo, buffer, targetOptions));
|
||||||
|
// We don't track this operation on the http layer as
|
||||||
|
// we do with the GET/LIST operations since this operations
|
||||||
|
// can trigger multiple underlying http requests but only one
|
||||||
|
// operation is billed.
|
||||||
|
stats.trackPostOperation();
|
||||||
} catch (final StorageException se) {
|
} catch (final StorageException se) {
|
||||||
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
|
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
|
||||||
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
|
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
|
||||||
|
|
|
@ -40,15 +40,15 @@ final class GoogleCloudStorageHttpStatsCollector implements HttpResponseIntercep
|
||||||
List.of(
|
List.of(
|
||||||
(bucket) ->
|
(bucket) ->
|
||||||
HttpRequestTracker.get(format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket),
|
HttpRequestTracker.get(format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket),
|
||||||
GoogleCloudStorageOperationsStats::trackGetObjectOperation),
|
GoogleCloudStorageOperationsStats::trackGetOperation),
|
||||||
|
|
||||||
(bucket) ->
|
(bucket) ->
|
||||||
HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket),
|
HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket),
|
||||||
GoogleCloudStorageOperationsStats::trackGetObjectOperation),
|
GoogleCloudStorageOperationsStats::trackGetOperation),
|
||||||
|
|
||||||
(bucket) ->
|
(bucket) ->
|
||||||
HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o", bucket),
|
HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o", bucket),
|
||||||
GoogleCloudStorageOperationsStats::trackListObjectsOperation)
|
GoogleCloudStorageOperationsStats::trackListOperation)
|
||||||
);
|
);
|
||||||
|
|
||||||
private final GoogleCloudStorageOperationsStats gcsOperationStats;
|
private final GoogleCloudStorageOperationsStats gcsOperationStats;
|
||||||
|
|
|
@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
final class GoogleCloudStorageOperationsStats {
|
final class GoogleCloudStorageOperationsStats {
|
||||||
|
|
||||||
private final AtomicLong getObjectCount = new AtomicLong();
|
private final AtomicLong getCount = new AtomicLong();
|
||||||
private final AtomicLong listCount = new AtomicLong();
|
private final AtomicLong listCount = new AtomicLong();
|
||||||
|
private final AtomicLong putCount = new AtomicLong();
|
||||||
|
private final AtomicLong postCount = new AtomicLong();
|
||||||
|
|
||||||
private final String bucketName;
|
private final String bucketName;
|
||||||
|
|
||||||
|
@ -34,11 +36,19 @@ final class GoogleCloudStorageOperationsStats {
|
||||||
this.bucketName = bucketName;
|
this.bucketName = bucketName;
|
||||||
}
|
}
|
||||||
|
|
||||||
void trackGetObjectOperation() {
|
void trackGetOperation() {
|
||||||
getObjectCount.incrementAndGet();
|
getCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
void trackListObjectsOperation() {
|
void trackPutOperation() {
|
||||||
|
putCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
void trackPostOperation() {
|
||||||
|
postCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
void trackListOperation() {
|
||||||
listCount.incrementAndGet();
|
listCount.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,8 +58,10 @@ final class GoogleCloudStorageOperationsStats {
|
||||||
|
|
||||||
Map<String, Long> toMap() {
|
Map<String, Long> toMap() {
|
||||||
final Map<String, Long> results = new HashMap<>();
|
final Map<String, Long> results = new HashMap<>();
|
||||||
results.put("GET", getObjectCount.get());
|
results.put("GET", getCount.get());
|
||||||
results.put("LIST", listCount.get());
|
results.put("LIST", listCount.get());
|
||||||
|
results.put("PUT", putCount.get());
|
||||||
|
results.put("POST", postCount.get());
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.repositories.gcs;
|
||||||
import com.google.api.gax.retrying.RetrySettings;
|
import com.google.api.gax.retrying.RetrySettings;
|
||||||
import com.google.cloud.http.HttpTransportOptions;
|
import com.google.cloud.http.HttpTransportOptions;
|
||||||
import com.google.cloud.storage.StorageOptions;
|
import com.google.cloud.storage.StorageOptions;
|
||||||
|
import com.sun.net.httpserver.Headers;
|
||||||
import com.sun.net.httpserver.HttpExchange;
|
import com.sun.net.httpserver.HttpExchange;
|
||||||
import com.sun.net.httpserver.HttpHandler;
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
import fixture.gcs.FakeOAuth2HttpHandler;
|
import fixture.gcs.FakeOAuth2HttpHandler;
|
||||||
|
@ -61,6 +62,8 @@ import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING;
|
||||||
|
@ -304,17 +307,40 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
||||||
@SuppressForbidden(reason = "this tests uses a HttpServer to emulate an GCS endpoint")
|
@SuppressForbidden(reason = "this tests uses a HttpServer to emulate an GCS endpoint")
|
||||||
private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler {
|
private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler {
|
||||||
|
|
||||||
|
public static final Pattern contentRangeMatcher = Pattern.compile("bytes \\d+-(\\d+)/(\\d+)");
|
||||||
|
|
||||||
GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) {
|
GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) {
|
||||||
super(delegate);
|
super(delegate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void maybeTrack(final String request) {
|
public void maybeTrack(final String request, Headers requestHeaders) {
|
||||||
if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
|
if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
|
||||||
trackRequest("LIST");
|
trackRequest("LIST");
|
||||||
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
|
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
|
||||||
trackRequest("GET");
|
trackRequest("GET");
|
||||||
}
|
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*", request) && isLastPart(requestHeaders)) {
|
||||||
|
trackRequest("PUT");
|
||||||
|
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) {
|
||||||
|
trackRequest("POST");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean isLastPart(Headers requestHeaders) {
|
||||||
|
if (requestHeaders.containsKey("Content-range") == false)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// https://cloud.google.com/storage/docs/json_api/v1/parameters#contentrange
|
||||||
|
final String contentRange = requestHeaders.getFirst("Content-range");
|
||||||
|
|
||||||
|
final Matcher matcher = contentRangeMatcher.matcher(contentRange);
|
||||||
|
|
||||||
|
if (matcher.matches() == false)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
String upperBound = matcher.group(1);
|
||||||
|
String totalLength = matcher.group(2);
|
||||||
|
return Integer.parseInt(upperBound) == Integer.parseInt(totalLength) - 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.elasticsearch.repositories.s3;
|
package org.elasticsearch.repositories.s3;
|
||||||
|
|
||||||
import com.amazonaws.http.AmazonHttpClient;
|
import com.amazonaws.http.AmazonHttpClient;
|
||||||
|
import com.sun.net.httpserver.Headers;
|
||||||
import com.sun.net.httpserver.HttpExchange;
|
import com.sun.net.httpserver.HttpExchange;
|
||||||
import com.sun.net.httpserver.HttpHandler;
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
import fixture.s3.S3HttpHandler;
|
import fixture.s3.S3HttpHandler;
|
||||||
|
@ -279,7 +280,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void maybeTrack(final String request) {
|
public void maybeTrack(final String request, Headers requestHeaders) {
|
||||||
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
|
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
|
||||||
trackRequest("LIST");
|
trackRequest("LIST");
|
||||||
} else if (Regex.simpleMatch("GET /*/*", request)) {
|
} else if (Regex.simpleMatch("GET /*/*", request)) {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.repositories.blobstore;
|
package org.elasticsearch.repositories.blobstore;
|
||||||
|
|
||||||
|
import com.sun.net.httpserver.Headers;
|
||||||
import com.sun.net.httpserver.HttpExchange;
|
import com.sun.net.httpserver.HttpExchange;
|
||||||
import com.sun.net.httpserver.HttpHandler;
|
import com.sun.net.httpserver.HttpHandler;
|
||||||
import com.sun.net.httpserver.HttpServer;
|
import com.sun.net.httpserver.HttpServer;
|
||||||
|
@ -173,7 +174,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
||||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||||
.build());
|
.build());
|
||||||
|
|
||||||
final long nbDocs = randomLongBetween(100, 1000);
|
final long nbDocs = randomLongBetween(10_000L, 20_000L);
|
||||||
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
|
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
|
||||||
waitForDocs(nbDocs, indexer);
|
waitForDocs(nbDocs, indexer);
|
||||||
}
|
}
|
||||||
|
@ -213,6 +214,8 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
||||||
|
|
||||||
assertSDKCallsMatchMockCalls(sdkRequestCounts, "GET");
|
assertSDKCallsMatchMockCalls(sdkRequestCounts, "GET");
|
||||||
assertSDKCallsMatchMockCalls(sdkRequestCounts, "LIST");
|
assertSDKCallsMatchMockCalls(sdkRequestCounts, "LIST");
|
||||||
|
assertSDKCallsMatchMockCalls(sdkRequestCounts, "POST");
|
||||||
|
assertSDKCallsMatchMockCalls(sdkRequestCounts, "PUT");
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertSDKCallsMatchMockCalls(Map<String, Long> sdkRequestCount, String requestTye) {
|
private void assertSDKCallsMatchMockCalls(Map<String, Long> sdkRequestCount, String requestTye) {
|
||||||
|
@ -327,7 +330,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
||||||
/**
|
/**
|
||||||
* HTTP handler that allows collect request stats per request type.
|
* HTTP handler that allows collect request stats per request type.
|
||||||
*
|
*
|
||||||
* Implementors should keep track of the desired requests on {@link #maybeTrack(String)}.
|
* Implementors should keep track of the desired requests on {@link #maybeTrack(String, Headers)}.
|
||||||
*/
|
*/
|
||||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
|
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
|
||||||
public abstract static class HttpStatsCollectorHandler implements DelegatingHttpHandler {
|
public abstract static class HttpStatsCollectorHandler implements DelegatingHttpHandler {
|
||||||
|
@ -357,7 +360,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
||||||
public void handle(HttpExchange exchange) throws IOException {
|
public void handle(HttpExchange exchange) throws IOException {
|
||||||
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
||||||
|
|
||||||
maybeTrack(request);
|
maybeTrack(request, exchange.getRequestHeaders());
|
||||||
|
|
||||||
delegate.handle(exchange);
|
delegate.handle(exchange);
|
||||||
}
|
}
|
||||||
|
@ -369,8 +372,9 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
||||||
* Request = Method SP Request-URI
|
* Request = Method SP Request-URI
|
||||||
*
|
*
|
||||||
* @param request the request to be tracked if it matches the criteria
|
* @param request the request to be tracked if it matches the criteria
|
||||||
|
* @param requestHeaders the http request headers
|
||||||
*/
|
*/
|
||||||
protected abstract void maybeTrack(String request);
|
protected abstract void maybeTrack(String request, Headers requestHeaders);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue