From 8ab9fc10c1a8f1c28596c633b4636441c5dd8a55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Mon, 18 May 2020 13:39:26 +0200 Subject: [PATCH] Track multipart/resumable uploads GCS API calls (#56892) Add tracking for multipart and resumable uploads for GoogleCloudStorage. For resumable uploads only the last request is taken into account for billing, so that's the only request that's tracked. Backport of #56821 --- .../gcs/GoogleCloudStorageBlobStore.java | 10 +++++++ .../GoogleCloudStorageHttpStatsCollector.java | 6 ++-- .../GoogleCloudStorageOperationsStats.java | 22 +++++++++++---- ...eCloudStorageBlobStoreRepositoryTests.java | 28 ++++++++++++++++++- .../s3/S3BlobStoreRepositoryTests.java | 3 +- ...ESMockAPIBasedRepositoryIntegTestCase.java | 12 +++++--- 6 files changed, 67 insertions(+), 14 deletions(-) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 8160422e70a..37e3deb46ad 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -293,6 +293,11 @@ class GoogleCloudStorageBlobStore implements BlobStore { } })); + // We don't track this operation on the http layer as + // we do with the GET/LIST operations since this operations + // can trigger multiple underlying http requests but only one + // operation is billed. + stats.trackPutOperation(); return; } catch (final StorageException se) { final int errorCode = se.getCode(); @@ -335,6 +340,11 @@ class GoogleCloudStorageBlobStore implements BlobStore { new Storage.BlobTargetOption[0]; SocketAccess.doPrivilegedVoidIOException( () -> client().create(blobInfo, buffer, targetOptions)); + // We don't track this operation on the http layer as + // we do with the GET/LIST operations since this operations + // can trigger multiple underlying http requests but only one + // operation is billed. + stats.trackPostOperation(); } catch (final StorageException se) { if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) { throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage()); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java index 4e5bf85ce28..8844e18c0c7 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageHttpStatsCollector.java @@ -40,15 +40,15 @@ final class GoogleCloudStorageHttpStatsCollector implements HttpResponseIntercep List.of( (bucket) -> HttpRequestTracker.get(format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket), - GoogleCloudStorageOperationsStats::trackGetObjectOperation), + GoogleCloudStorageOperationsStats::trackGetOperation), (bucket) -> HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket), - GoogleCloudStorageOperationsStats::trackGetObjectOperation), + GoogleCloudStorageOperationsStats::trackGetOperation), (bucket) -> HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o", bucket), - GoogleCloudStorageOperationsStats::trackListObjectsOperation) + GoogleCloudStorageOperationsStats::trackListOperation) ); private final GoogleCloudStorageOperationsStats gcsOperationStats; diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java index dc41b6bf03f..4a0de249a3e 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageOperationsStats.java @@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicLong; final class GoogleCloudStorageOperationsStats { - private final AtomicLong getObjectCount = new AtomicLong(); + private final AtomicLong getCount = new AtomicLong(); private final AtomicLong listCount = new AtomicLong(); + private final AtomicLong putCount = new AtomicLong(); + private final AtomicLong postCount = new AtomicLong(); private final String bucketName; @@ -34,11 +36,19 @@ final class GoogleCloudStorageOperationsStats { this.bucketName = bucketName; } - void trackGetObjectOperation() { - getObjectCount.incrementAndGet(); + void trackGetOperation() { + getCount.incrementAndGet(); } - void trackListObjectsOperation() { + void trackPutOperation() { + putCount.incrementAndGet(); + } + + void trackPostOperation() { + postCount.incrementAndGet(); + } + + void trackListOperation() { listCount.incrementAndGet(); } @@ -48,8 +58,10 @@ final class GoogleCloudStorageOperationsStats { Map toMap() { final Map results = new HashMap<>(); - results.put("GET", getObjectCount.get()); + results.put("GET", getCount.get()); results.put("LIST", listCount.get()); + results.put("PUT", putCount.get()); + results.put("POST", postCount.get()); return results; } } diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 617e8d5dfec..57c7422664d 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.repositories.gcs; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; +import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.gcs.FakeOAuth2HttpHandler; @@ -61,6 +62,8 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CREDENTIALS_FILE_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.ENDPOINT_SETTING; @@ -304,17 +307,40 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe @SuppressForbidden(reason = "this tests uses a HttpServer to emulate an GCS endpoint") private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler { + public static final Pattern contentRangeMatcher = Pattern.compile("bytes \\d+-(\\d+)/(\\d+)"); + GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) { super(delegate); } @Override - public void maybeTrack(final String request) { + public void maybeTrack(final String request, Headers requestHeaders) { if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) { trackRequest("LIST"); } else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) { trackRequest("GET"); + } else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*", request) && isLastPart(requestHeaders)) { + trackRequest("PUT"); + } else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) { + trackRequest("POST"); } } + + boolean isLastPart(Headers requestHeaders) { + if (requestHeaders.containsKey("Content-range") == false) + return false; + + // https://cloud.google.com/storage/docs/json_api/v1/parameters#contentrange + final String contentRange = requestHeaders.getFirst("Content-range"); + + final Matcher matcher = contentRangeMatcher.matcher(contentRange); + + if (matcher.matches() == false) + return false; + + String upperBound = matcher.group(1); + String totalLength = matcher.group(2); + return Integer.parseInt(upperBound) == Integer.parseInt(totalLength) - 1; + } } } diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 68b6c794c74..b1a704272a7 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.s3; import com.amazonaws.http.AmazonHttpClient; +import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import fixture.s3.S3HttpHandler; @@ -279,7 +280,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes } @Override - public void maybeTrack(final String request) { + public void maybeTrack(final String request, Headers requestHeaders) { if (Regex.simpleMatch("GET /*/?prefix=*", request)) { trackRequest("LIST"); } else if (Regex.simpleMatch("GET /*/*", request)) { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index 98e006c5d92..bc216987d27 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.repositories.blobstore; +import com.sun.net.httpserver.Headers; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; @@ -173,7 +174,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build()); - final long nbDocs = randomLongBetween(100, 1000); + final long nbDocs = randomLongBetween(10_000L, 20_000L); try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) { waitForDocs(nbDocs, indexer); } @@ -213,6 +214,8 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR assertSDKCallsMatchMockCalls(sdkRequestCounts, "GET"); assertSDKCallsMatchMockCalls(sdkRequestCounts, "LIST"); + assertSDKCallsMatchMockCalls(sdkRequestCounts, "POST"); + assertSDKCallsMatchMockCalls(sdkRequestCounts, "PUT"); } private void assertSDKCallsMatchMockCalls(Map sdkRequestCount, String requestTye) { @@ -327,7 +330,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR /** * HTTP handler that allows collect request stats per request type. * - * Implementors should keep track of the desired requests on {@link #maybeTrack(String)}. + * Implementors should keep track of the desired requests on {@link #maybeTrack(String, Headers)}. */ @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") public abstract static class HttpStatsCollectorHandler implements DelegatingHttpHandler { @@ -357,7 +360,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR public void handle(HttpExchange exchange) throws IOException { final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); - maybeTrack(request); + maybeTrack(request, exchange.getRequestHeaders()); delegate.handle(exchange); } @@ -369,8 +372,9 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR * Request = Method SP Request-URI * * @param request the request to be tracked if it matches the criteria + * @param requestHeaders the http request headers */ - protected abstract void maybeTrack(String request); + protected abstract void maybeTrack(String request, Headers requestHeaders); } /**