Track GET/LIST GoogleCloudStorage API calls (#56758)

Backporting #56585 to 7.x branch.

Adds tracking for the API calls performed by the GoogleCloudStorage
underlying SDK. It hooks an HttpResponseInterceptor to the SDK
transport layer and does http request filtering based on the URI
paths that we are interested to track. Unfortunately we cannot hook
a wrapper into the ServiceRPC interface since we're using different
levels of abstraction to implement retries during reads
(GoogleCloudStorageRetryingInputStream).
This commit is contained in:
Francisco Fernández Castaño 2020-05-14 14:03:21 +02:00 committed by GitHub
parent f0c2c25527
commit 97bf47f5b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 466 additions and 152 deletions

View File

@ -98,19 +98,26 @@ class GoogleCloudStorageBlobStore implements BlobStore {
private final String bucketName;
private final String clientName;
private final String repositoryName;
private final GoogleCloudStorageService storageService;
private final GoogleCloudStorageOperationsStats stats;
GoogleCloudStorageBlobStore(String bucketName, String clientName, GoogleCloudStorageService storageService) {
GoogleCloudStorageBlobStore(String bucketName,
String clientName,
String repositoryName,
GoogleCloudStorageService storageService) {
this.bucketName = bucketName;
this.clientName = clientName;
this.repositoryName = repositoryName;
this.storageService = storageService;
this.stats = new GoogleCloudStorageOperationsStats(bucketName);
if (doesBucketExist(bucketName) == false) {
throw new BlobStoreException("Bucket [" + bucketName + "] does not exist");
}
}
private Storage client() throws IOException {
return storageService.client(clientName);
return storageService.client(clientName, repositoryName, stats);
}
@Override
@ -119,7 +126,8 @@ class GoogleCloudStorageBlobStore implements BlobStore {
}
@Override
public void close() {
public void close() throws IOException {
storageService.closeRepositoryClient(repositoryName);
}
/**
@ -411,4 +419,9 @@ class GoogleCloudStorageBlobStore implements BlobStore {
assert s != null;
return keyPath + s;
}
@Override
public Map<String, Long> stats() {
return stats.toMap();
}
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.gcs;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseInterceptor;
import org.elasticsearch.common.collect.List;
import java.util.Locale;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.lang.String.format;
final class GoogleCloudStorageHttpStatsCollector implements HttpResponseInterceptor {
// The specification for the current API (v1) endpoints can be found at:
// https://cloud.google.com/storage/docs/json_api/v1
private static final java.util.List<Function<String, HttpRequestTracker>> trackerFactories =
List.of(
(bucket) ->
HttpRequestTracker.get(format(Locale.ROOT, "/download/storage/v1/b/%s/o/.+", bucket),
GoogleCloudStorageOperationsStats::trackGetObjectOperation),
(bucket) ->
HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o/.+", bucket),
GoogleCloudStorageOperationsStats::trackGetObjectOperation),
(bucket) ->
HttpRequestTracker.get(format(Locale.ROOT, "/storage/v1/b/%s/o", bucket),
GoogleCloudStorageOperationsStats::trackListObjectsOperation)
);
private final GoogleCloudStorageOperationsStats gcsOperationStats;
private final java.util.List<HttpRequestTracker> trackers;
GoogleCloudStorageHttpStatsCollector(final GoogleCloudStorageOperationsStats gcsOperationStats) {
this.gcsOperationStats = gcsOperationStats;
this.trackers = trackerFactories.stream()
.map(trackerFactory -> trackerFactory.apply(gcsOperationStats.getTrackedBucket()))
.collect(Collectors.toList());
}
@Override
public void interceptResponse(final HttpResponse response) {
// TODO keep track of unsuccessful requests in different entries
if (!response.isSuccessStatusCode())
return;
final HttpRequest request = response.getRequest();
for (HttpRequestTracker tracker : trackers) {
if (tracker.track(request, gcsOperationStats)) {
return;
}
}
}
/**
* Http request tracker that allows to track certain HTTP requests based on the following criteria:
* <ul>
* <li>The HTTP request method</li>
* <li>An URI path regex expression</li>
* </ul>
*
* The requests that match the previous criteria are tracked using the {@code statsTracker} function.
*/
private static final class HttpRequestTracker {
private final String method;
private final Pattern pathPattern;
private final Consumer<GoogleCloudStorageOperationsStats> statsTracker;
private HttpRequestTracker(final String method,
final String pathPattern,
final Consumer<GoogleCloudStorageOperationsStats> statsTracker) {
this.method = method;
this.pathPattern = Pattern.compile(pathPattern);
this.statsTracker = statsTracker;
}
private static HttpRequestTracker get(final String pathPattern,
final Consumer<GoogleCloudStorageOperationsStats> statsConsumer) {
return new HttpRequestTracker("GET", pathPattern, statsConsumer);
}
/**
* Tracks the provided http request if it matches the criteria defined by this tracker.
*
* @param httpRequest the http request to be tracked
* @param stats the operation tracker
*
* @return {@code true} if the http request was tracked, {@code false} otherwise.
*/
private boolean track(final HttpRequest httpRequest, final GoogleCloudStorageOperationsStats stats) {
if (matchesCriteria(httpRequest) == false)
return false;
statsTracker.accept(stats);
return true;
}
private boolean matchesCriteria(final HttpRequest httpRequest) {
return method.equalsIgnoreCase(httpRequest.getRequestMethod()) &&
pathMatches(httpRequest.getUrl());
}
private boolean pathMatches(final GenericUrl url) {
return pathPattern.matcher(url.getRawPath()).matches();
}
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.gcs;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
final class GoogleCloudStorageOperationsStats {
private final AtomicLong getObjectCount = new AtomicLong();
private final AtomicLong listCount = new AtomicLong();
private final String bucketName;
GoogleCloudStorageOperationsStats(String bucketName) {
this.bucketName = bucketName;
}
void trackGetObjectOperation() {
getObjectCount.incrementAndGet();
}
void trackListObjectsOperation() {
listCount.incrementAndGet();
}
String getTrackedBucket() {
return bucketName;
}
Map<String, Long> toMap() {
final Map<String, Long> results = new HashMap<>();
results.put("GET", getObjectCount.get());
results.put("LIST", listCount.get());
return results;
}
}

View File

@ -91,7 +91,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, storageService);
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService);
}
@Override

View File

@ -20,9 +20,11 @@
package org.elasticsearch.repositories.gcs;
import com.google.api.client.googleapis.GoogleUtils;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.ServiceOptions;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
@ -33,12 +35,10 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.LazyInitializable;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.emptyMap;
@ -46,11 +46,14 @@ public class GoogleCloudStorageService {
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageService.class);
private volatile Map<String, GoogleCloudStorageClientSettings> clientSettings = emptyMap();
/**
* Dictionary of client instances. Client instances are built lazily from the
* latest settings.
* latest settings. Each repository has its own client instance identified by
* the repository name.
*/
private final AtomicReference<Map<String, LazyInitializable<Storage, IOException>>> clientsCache = new AtomicReference<>(emptyMap());
private volatile Map<String, Storage> clientCache = emptyMap();
/**
* Refreshes the client settings and clears the client cache. Subsequent calls to
@ -60,16 +63,8 @@ public class GoogleCloudStorageService {
* @param clientsSettings the new settings used for building clients for subsequent requests
*/
public synchronized void refreshAndClearCache(Map<String, GoogleCloudStorageClientSettings> clientsSettings) {
// build the new lazy clients
final MapBuilder<String, LazyInitializable<Storage, IOException>> newClientsCache = MapBuilder.newMapBuilder();
for (final Map.Entry<String, GoogleCloudStorageClientSettings> entry : clientsSettings.entrySet()) {
newClientsCache.put(entry.getKey(),
new LazyInitializable<Storage, IOException>(() -> createClient(entry.getKey(), entry.getValue())));
}
// make the new clients available
final Map<String, LazyInitializable<Storage, IOException>> oldClientCache = clientsCache.getAndSet(newClientsCache.immutableMap());
// release old clients
oldClientCache.values().forEach(LazyInitializable::reset);
this.clientCache = emptyMap();
this.clientSettings = MapBuilder.newMapBuilder(clientsSettings).immutableMap();
}
/**
@ -80,29 +75,56 @@ public class GoogleCloudStorageService {
* method.
*
* @param clientName name of the client settings used to create the client
* @param repositoryName name of the repository that would use the client
* @param stats the stats collector used to gather information about the underlying SKD API calls.
* @return a cached client storage instance that can be used to manage objects
* (blobs)
*/
public Storage client(final String clientName) throws IOException {
final LazyInitializable<Storage, IOException> lazyClient = clientsCache.get().get(clientName);
if (lazyClient == null) {
throw new IllegalArgumentException("Unknown client name [" + clientName + "]. Existing client configs: "
+ Strings.collectionToDelimitedString(clientsCache.get().keySet(), ","));
public Storage client(final String clientName,
final String repositoryName,
final GoogleCloudStorageOperationsStats stats) throws IOException {
{
final Storage storage = clientCache.get(repositoryName);
if (storage != null) {
return storage;
}
return lazyClient.getOrCompute();
}
synchronized (this) {
final Storage existing = clientCache.get(repositoryName);
if (existing != null) {
return existing;
}
final GoogleCloudStorageClientSettings settings = clientSettings.get(clientName);
if (settings == null) {
throw new IllegalArgumentException("Unknown client name [" + clientName + "]. Existing client configs: "
+ Strings.collectionToDelimitedString(clientSettings.keySet(), ","));
}
logger.debug(() -> new ParameterizedMessage("creating GCS client with client_name [{}], endpoint [{}]", clientName,
settings.getHost()));
final Storage storage = createClient(settings, stats);
clientCache = MapBuilder.newMapBuilder(clientCache).put(repositoryName, storage).immutableMap();
return storage;
}
}
synchronized void closeRepositoryClient(String repositoryName) {
clientCache = MapBuilder.newMapBuilder(clientCache).remove(repositoryName).immutableMap();
}
/**
* Creates a client that can be used to manage Google Cloud Storage objects. The client is thread-safe.
*
* @param clientName name of client settings to use, including secure settings
* @param clientSettings name of client settings to use, including secure settings
* @param clientSettings client settings to use, including secure settings
* @param stats the stats collector to use by the underlying SDK
* @return a new client storage instance that can be used to manage objects
* (blobs)
*/
private Storage createClient(String clientName, GoogleCloudStorageClientSettings clientSettings) throws IOException {
logger.debug(() -> new ParameterizedMessage("creating GCS client with client_name [{}], endpoint [{}]", clientName,
clientSettings.getHost()));
private Storage createClient(GoogleCloudStorageClientSettings clientSettings,
GoogleCloudStorageOperationsStats stats) throws IOException {
final HttpTransport httpTransport = SocketAccess.doPrivilegedIOException(() -> {
final NetHttpTransport.Builder builder = new NetHttpTransport.Builder();
// requires java.lang.RuntimePermission "setFactory"
@ -110,11 +132,27 @@ public class GoogleCloudStorageService {
builder.trustCertificates(GoogleUtils.getCertificateTrustStore());
return builder.build();
});
final HttpTransportOptions httpTransportOptions = HttpTransportOptions.newBuilder()
final GoogleCloudStorageHttpStatsCollector httpStatsCollector = new GoogleCloudStorageHttpStatsCollector(stats);
final HttpTransportOptions httpTransportOptions = new HttpTransportOptions(HttpTransportOptions.newBuilder()
.setConnectTimeout(toTimeout(clientSettings.getConnectTimeout()))
.setReadTimeout(toTimeout(clientSettings.getReadTimeout()))
.setHttpTransportFactory(() -> httpTransport)
.build();
.setHttpTransportFactory(() -> httpTransport)) {
@Override
public HttpRequestInitializer getHttpRequestInitializer(ServiceOptions<?, ?> serviceOptions) {
HttpRequestInitializer requestInitializer = super.getHttpRequestInitializer(serviceOptions);
return (httpRequest) -> {
if (requestInitializer != null)
requestInitializer.initialize(httpRequest);
httpRequest.setResponseInterceptor(httpStatsCollector);
};
}
};
final StorageOptions storageOptions = createStorageOptions(clientSettings, httpTransportOptions);
return storageOptions.getService();
}
@ -170,5 +208,4 @@ public class GoogleCloudStorageService {
}
return Math.toIntExact(timeout.getMillis());
}
}

View File

@ -169,7 +169,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
}))
);
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, service);
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore("bucket", client, "repo", service);
httpContexts.forEach(httpContext -> httpServer.removeContext(httpContext));
return new GoogleCloudStorageBlobContainer(BlobPath.cleanPath(), blobStore);

View File

@ -85,9 +85,9 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESTestCase {
when(storage.batch()).thenReturn(batch);
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
when(storageService.client(any(String.class))).thenReturn(storage);
when(storageService.client(any(String.class), any(String.class), any(GoogleCloudStorageOperationsStats.class))).thenReturn(storage);
try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", storageService)) {
try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", "repo", storageService)) {
final BlobContainer container = store.blobContainer(new BlobPath());
IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs));

View File

@ -39,6 +39,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -104,7 +105,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
final Map<String, HttpHandler> handlers = new HashMap<>(2);
handlers.put("/", new GoogleCloudStorageBlobStoreHttpHandler("bucket"));
handlers.put("/", new GoogleCloudStorageStatsCollectorHttpHandler(new GoogleCloudStorageBlobStoreHttpHandler("bucket")));
handlers.put("/token", new FakeOAuth2HttpHandler());
return Collections.unmodifiableMap(handlers);
}
@ -239,7 +240,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, clusterService) {
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore("bucket", "test", storageService) {
return new GoogleCloudStorageBlobStore("bucket", "test", metadata.name(), storageService) {
@Override
long getLargeBlobThresholdInBytes() {
return ByteSizeUnit.MB.toBytes(1);
@ -296,4 +297,24 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
return exchange.getRequestURI().toString().startsWith("/batch/") == false;
}
}
/**
* HTTP handler that keeps track of requests performed against GCP.
*/
@SuppressForbidden(reason = "this tests uses a HttpServer to emulate an GCS endpoint")
private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler {
GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) {
super(delegate);
}
@Override
public void maybeTrack(final String request) {
if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
trackRequest("LIST");
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
trackRequest("GET");
}
}
}
}

View File

@ -65,11 +65,13 @@ public class GoogleCloudStorageServiceTests extends ESTestCase {
.build();
final GoogleCloudStorageService service = new GoogleCloudStorageService();
service.refreshAndClearCache(GoogleCloudStorageClientSettings.load(settings));
final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> service.client("another_client"));
GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket");
final IllegalArgumentException e =
expectThrows(IllegalArgumentException.class, () -> service.client("another_client", "repo", statsCollector));
assertThat(e.getMessage(), Matchers.startsWith("Unknown client name"));
assertSettingDeprecationsAndWarnings(
new Setting<?>[] { GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING.getConcreteSettingForNamespace(clientName) });
final Storage storage = service.client(clientName);
final Storage storage = service.client(clientName, "repo", statsCollector);
assertThat(storage.getOptions().getApplicationName(), Matchers.containsString(applicationName));
assertThat(storage.getOptions().getHost(), Matchers.is(endpoint));
assertThat(storage.getOptions().getProjectId(), Matchers.is(projectIdName));
@ -92,31 +94,53 @@ public class GoogleCloudStorageServiceTests extends ESTestCase {
final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build();
try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings1)) {
final GoogleCloudStorageService storageService = plugin.storageService;
final Storage client11 = storageService.client("gcs1");
GoogleCloudStorageOperationsStats statsCollector = new GoogleCloudStorageOperationsStats("bucket");
final Storage client11 = storageService.client("gcs1", "repo1", statsCollector);
assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11"));
final Storage client12 = storageService.client("gcs2");
final Storage client12 = storageService.client("gcs2", "repo2", statsCollector);
assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12"));
// client 3 is missing
final IllegalArgumentException e1 = expectThrows(IllegalArgumentException.class, () -> storageService.client("gcs3"));
final IllegalArgumentException e1 =
expectThrows(IllegalArgumentException.class, () -> storageService.client("gcs3", "repo3", statsCollector));
assertThat(e1.getMessage(), containsString("Unknown client name [gcs3]."));
// update client settings
plugin.reload(settings2);
// old client 1 not changed
assertThat(client11.getOptions().getProjectId(), equalTo("project_gcs11"));
// new client 1 is changed
final Storage client21 = storageService.client("gcs1");
final Storage client21 = storageService.client("gcs1", "repo1", statsCollector);
assertThat(client21.getOptions().getProjectId(), equalTo("project_gcs21"));
// old client 2 not changed
assertThat(client12.getOptions().getProjectId(), equalTo("project_gcs12"));
// new client2 is gone
final IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, () -> storageService.client("gcs2"));
final IllegalArgumentException e2 =
expectThrows(IllegalArgumentException.class, () -> storageService.client("gcs2", "repo2", statsCollector));
assertThat(e2.getMessage(), containsString("Unknown client name [gcs2]."));
// client 3 emerged
final Storage client23 = storageService.client("gcs3");
final Storage client23 = storageService.client("gcs3", "repo3", statsCollector);
assertThat(client23.getOptions().getProjectId(), equalTo("project_gcs23"));
}
}
public void testClientsAreNotSharedAcrossRepositories() throws Exception {
final MockSecureSettings secureSettings1 = new MockSecureSettings();
secureSettings1.setFile("gcs.client.gcs1.credentials_file", serviceAccountFileContent("test_project"));
final Settings settings = Settings.builder().setSecureSettings(secureSettings1).build();
try (GoogleCloudStoragePlugin plugin = new GoogleCloudStoragePlugin(settings)) {
final GoogleCloudStorageService storageService = plugin.storageService;
final Storage repo1Client =
storageService.client("gcs1", "repo1", new GoogleCloudStorageOperationsStats("bucket"));
final Storage repo2Client =
storageService.client("gcs1", "repo2", new GoogleCloudStorageOperationsStats("bucket"));
final Storage repo1ClientSecondInstance =
storageService.client("gcs1", "repo1", new GoogleCloudStorageOperationsStats("bucket"));
assertNotSame(repo1Client, repo2Client);
assertSame(repo1Client, repo1ClientSecondInstance);
}
}
private byte[] serviceAccountFileContent(String projectId) throws Exception {
final KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA");
keyPairGenerator.initialize(1024);

View File

@ -23,9 +23,7 @@ import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
@ -44,14 +42,11 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -62,13 +57,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.startsWith;
@ -119,12 +109,12 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/bucket", new S3StatsHttpHandler(new S3BlobStoreHttpHandler("bucket")));
return Collections.singletonMap("/bucket", new S3StatsCollectorHttpHandler(new S3BlobStoreHttpHandler("bucket")));
}
@Override
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
return new S3StatsHttpHandler(new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3)));
return new S3StatsCollectorHttpHandler(new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3)));
}
@Override
@ -187,81 +177,6 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}
public void testRequestStats() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final long nbDocs = randomLongBetween(100, 1000);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
final String snapshot = "snapshot";
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
.setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
final RepositoryStats repositoryStats = StreamSupport.stream(
internalCluster().getInstances(RepositoriesService.class).spliterator(), false)
.map(repositoriesService -> {
try {
return repositoriesService.repository(repository);
} catch (RepositoryMissingException e) {
return null;
}
})
.filter(r -> r != null)
.map(r -> r.stats())
.reduce((s1, s2) -> s1.merge(s2))
.get();
final long sdkGetCalls = repositoryStats.requestCounts.get("GET");
final long sdkListCalls = repositoryStats.requestCounts.get("LIST");
final long getCalls = handlers.values().stream()
.mapToLong(h -> {
while (h instanceof DelegatingHttpHandler) {
if (h instanceof S3StatsHttpHandler) {
return ((S3StatsHttpHandler) h).getCalls.get();
}
h = ((DelegatingHttpHandler) h).getDelegate();
}
assert false;
return 0L;
})
.sum();
final long listCalls = handlers.values().stream()
.mapToLong(h -> {
while (h instanceof DelegatingHttpHandler) {
if (h instanceof S3StatsHttpHandler) {
return ((S3StatsHttpHandler) h).listCalls.get();
}
h = ((DelegatingHttpHandler) h).getDelegate();
}
assert false;
return 0L;
})
.sum();
assertEquals("SDK sent " + sdkGetCalls + " GET calls and handler measured " + getCalls + " GET calls", getCalls, sdkGetCalls);
assertEquals("SDK sent " + sdkListCalls + " LIST calls and handler measured " + listCalls + " LIST calls", listCalls, sdkListCalls);
}
/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
@ -353,32 +268,23 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
}
}
/**
* HTTP handler that tracks the number of requests performed against S3.
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
private static class S3StatsHttpHandler implements DelegatingHttpHandler {
private static class S3StatsCollectorHttpHandler extends HttpStatsCollectorHandler {
private final HttpHandler delegate;
public final AtomicLong getCalls = new AtomicLong();
public final AtomicLong listCalls = new AtomicLong();
S3StatsHttpHandler(final HttpHandler delegate) {
this.delegate = delegate;
S3StatsCollectorHttpHandler(final HttpHandler delegate) {
super(delegate);
}
@Override
public HttpHandler getDelegate() {
return delegate;
}
@Override
public void handle(HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
public void maybeTrack(final String request) {
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
listCalls.incrementAndGet();
trackRequest("LIST");
} else if (Regex.simpleMatch("GET /*/*", request)) {
getCalls.incrementAndGet();
}
delegate.handle(exchange);
trackRequest("GET");
}
}
}
}

View File

@ -33,6 +33,10 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.test.BackgroundIndexer;
import org.junit.After;
import org.junit.AfterClass;
@ -46,9 +50,11 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -159,6 +165,79 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
}
public void testRequestStats() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final long nbDocs = randomLongBetween(100, 1000);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
final String snapshot = "snapshot";
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
.setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
final RepositoryStats repositoryStats = StreamSupport.stream(
internalCluster().getInstances(RepositoriesService.class).spliterator(), false)
.map(repositoriesService -> {
try {
return repositoriesService.repository(repository);
} catch (RepositoryMissingException e) {
return null;
}
})
.filter(Objects::nonNull)
.map(Repository::stats)
.reduce(RepositoryStats::merge)
.get();
Map<String, Long> sdkRequestCounts = repositoryStats.requestCounts;
assertSDKCallsMatchMockCalls(sdkRequestCounts, "GET");
assertSDKCallsMatchMockCalls(sdkRequestCounts, "LIST");
}
private void assertSDKCallsMatchMockCalls(Map<String, Long> sdkRequestCount, String requestTye) {
final long sdkCalls = sdkRequestCount.getOrDefault(requestTye, 0L);
final long mockCalls = handlers.values().stream()
.mapToLong(h -> {
while (h instanceof DelegatingHttpHandler) {
if (h instanceof HttpStatsCollectorHandler) {
return ((HttpStatsCollectorHandler) h).getCount(requestTye);
}
h = ((DelegatingHttpHandler) h).getDelegate();
}
return 0L;
}).sum();
String assertionErrorMsg = String.format("SDK sent %d [%s] calls and handler measured %d [%s] calls",
sdkCalls,
requestTye,
mockCalls,
requestTye);
assertEquals(assertionErrorMsg, mockCalls, sdkCalls);
}
protected static String httpServerUrl() {
InetSocketAddress address = httpServer.getAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
@ -245,6 +324,55 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
HttpHandler getDelegate();
}
/**
* HTTP handler that allows collect request stats per request type.
*
* Implementors should keep track of the desired requests on {@link #maybeTrack(String)}.
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
public abstract static class HttpStatsCollectorHandler implements DelegatingHttpHandler {
private final HttpHandler delegate;
private final Map<String, Long> operationCount = new HashMap<>();
public HttpStatsCollectorHandler(HttpHandler delegate) {
this.delegate = delegate;
}
@Override
public HttpHandler getDelegate() {
return delegate;
}
synchronized long getCount(final String requestType) {
return operationCount.getOrDefault(requestType, 0L);
}
protected synchronized void trackRequest(final String requestType) {
operationCount.put(requestType, operationCount.getOrDefault(requestType, 0L) + 1);
}
@Override
public void handle(HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
maybeTrack(request);
delegate.handle(exchange);
}
/**
* Tracks the given request if it matches the criteria.
*
* The request is represented as:
* Request = Method SP Request-URI
*
* @param request the request to be tracked if it matches the criteria
*/
protected abstract void maybeTrack(String request);
}
/**
* Wrap a {@link HttpHandler} to log any thrown exception using the given {@link Logger}.
*/