Provide repository-level stats for searchable snapshots ()

Provides basic repository-level stats that will allow us to get some insight into how many
requests are actually being made by the underlying SDK. Currently only tracks GET and LIST
calls for S3 repositories. Most of the code is unfortunately boiler plate to add a new endpoint
that will help us better understand some of the low-level dynamics of searchable snapshots.
This commit is contained in:
Yannick Welsch 2020-04-14 14:12:48 +02:00
parent d5bb574e1e
commit a610513ec7
24 changed files with 837 additions and 19 deletions
docs/reference/searchable-snapshots/apis
plugins/repository-s3/src
main/java/org/elasticsearch/repositories/s3
test/java/org/elasticsearch/repositories/s3
server/src/main/java/org/elasticsearch
test/framework/src/main/java/org/elasticsearch/repositories/blobstore
x-pack/plugin

View File

@ -1,5 +1,5 @@
[role="xpack"]
[testenv="basic"]
[testenv="platinum"]
[[searchable-snapshots-api-clear-cache]]
=== Clear cache API
++++

View File

@ -1,5 +1,5 @@
[role="xpack"]
[testenv="basic"]
[testenv="platinum"]
[[searchable-snapshots-api-stats]]
=== Searchable snapshot statistics API
++++

View File

@ -1,5 +1,5 @@
[role="xpack"]
[testenv="basic"]
[testenv="platinum"]
[[searchable-snapshots-api-mount-snapshot]]
=== Mount snapshot API
++++

View File

@ -0,0 +1,73 @@
[role="xpack"]
[testenv="platinum"]
[[searchable-snapshots-repository-stats]]
=== Searchable snapshot repository statistics API
++++
<titleabbrev>Searchable snapshot repository statistics</titleabbrev>
++++
experimental[]
Retrieve usage statistics about a snapshot repository.
[[searchable-snapshots-repository-stats-request]]
==== {api-request-title}
`GET /_snapshot/<repository>/_stats`
[[searchable-snapshots-repository-stats-prereqs]]
==== {api-prereq-title}
If the {es} {security-features} are enabled, you must have the
`manage` cluster privilege and the `manage` index privilege
for any included indices to use this API.
For more information, see <<security-privileges>>.
[[searchable-snapshots-repository-stats-desc]]
==== {api-description-title}
[[searchable-snapshots-repository-stats-path-params]]
==== {api-path-parms-title}
`<repository>`::
(Required, string)
The repository for which to retrieve stats.
[[searchable-snapshots-repository-stats-example]]
==== {api-examples-title}
////
[source,console]
-----------------------------------
PUT /docs
{
"settings" : {
"index.number_of_shards" : 1,
"index.number_of_replicas" : 0
}
}
PUT /_snapshot/my_repository/my_snapshot?wait_for_completion=true
{
"include_global_state": false,
"indices": "docs"
}
DELETE /docs
POST /_snapshot/my_repository/my_snapshot/_mount?wait_for_completion=true
{
"index": "docs"
}
-----------------------------------
// TEST[setup:setup-repository]
////
Retrieves the statistics of the repository `my_repository`:
[source,console]
--------------------------------------------------
GET /_snapshot/my_repository/_stats
--------------------------------------------------
// TEST[continued]

View File

@ -1,5 +1,5 @@
[role="xpack"]
[testenv="basic"]
[testenv="platinum"]
[[searchable-snapshots-apis]]
== Searchable snapshots APIs
@ -10,7 +10,9 @@ You can use the following APIs to perform searchable snapshots operations.
* <<searchable-snapshots-api-mount-snapshot,Mount snapshot>>
* <<searchable-snapshots-api-clear-cache,Clear cache>>
* <<searchable-snapshots-api-stats,Get stats>>
* <<searchable-snapshots-repository-stats,Repository stats>>
include::mount-snapshot.asciidoc[]
include::clear-cache.asciidoc[]
include::get-stats.asciidoc[]
include::repository-stats.asciidoc[]

View File

@ -151,6 +151,7 @@ class S3BlobContainer extends AbstractBlobContainer {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(blobStore.bucket());
listObjectsRequest.setPrefix(keyPath);
listObjectsRequest.setRequestMetricCollector(blobStore.listMetricCollector);
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
final List<String> blobsToDelete = new ArrayList<>();
@ -307,7 +308,8 @@ class S3BlobContainer extends AbstractBlobContainer {
}
private ListObjectsRequest listObjectsRequest(String keyPath) {
return new ListObjectsRequest().withBucketName(blobStore.bucket()).withPrefix(keyPath).withDelimiter("/");
return new ListObjectsRequest().withBucketName(blobStore.bucket()).withPrefix(keyPath).withDelimiter("/")
.withRequestMetricCollector(blobStore.listMetricCollector);
}
private String buildKey(String blobName) {

View File

@ -19,8 +19,12 @@
package org.elasticsearch.repositories.s3;
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.util.AWSRequestMetrics;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
@ -29,7 +33,10 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.unit.ByteSizeValue;
import java.io.IOException;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
class S3BlobStore implements BlobStore {
@ -47,6 +54,12 @@ class S3BlobStore implements BlobStore {
private final RepositoryMetadata repositoryMetadata;
private final Stats stats = new Stats();
final RequestMetricCollector getMetricCollector;
final RequestMetricCollector listMetricCollector;
S3BlobStore(S3Service service, String bucket, boolean serverSideEncryption,
ByteSizeValue bufferSize, String cannedACL, String storageClass,
RepositoryMetadata repositoryMetadata) {
@ -57,6 +70,26 @@ class S3BlobStore implements BlobStore {
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.repositoryMetadata = repositoryMetadata;
this.getMetricCollector = new RequestMetricCollector() {
@Override
public void collectMetrics(Request<?> request, Response<?> response) {
assert request.getHttpMethod().name().equals("GET");
final Number requestCount = request.getAWSRequestMetrics().getTimingInfo()
.getCounter(AWSRequestMetrics.Field.RequestCount.name());
assert requestCount != null;
stats.getCount.addAndGet(requestCount.longValue());
}
};
this.listMetricCollector = new RequestMetricCollector() {
@Override
public void collectMetrics(Request<?> request, Response<?> response) {
assert request.getHttpMethod().name().equals("GET");
final Number requestCount = request.getAWSRequestMetrics().getTimingInfo()
.getCounter(AWSRequestMetrics.Field.RequestCount.name());
assert requestCount != null;
stats.listCount.addAndGet(requestCount.longValue());
}
};
}
@Override
@ -94,6 +127,11 @@ class S3BlobStore implements BlobStore {
this.service.close();
}
@Override
public Map<String, Long> stats() {
return stats.toMap();
}
public CannedAccessControlList getCannedACL() {
return cannedACL;
}
@ -135,4 +173,18 @@ class S3BlobStore implements BlobStore {
throw new BlobStoreException("cannedACL is not valid: [" + cannedACL + "]");
}
static class Stats {
final AtomicLong listCount = new AtomicLong();
final AtomicLong getCount = new AtomicLong();
Map<String, Long> toMap() {
final Map<String, Long> results = new HashMap<>();
results.put("GET", getCount.get());
results.put("LIST", listCount.get());
return results;
}
}
}

View File

@ -25,9 +25,9 @@ import com.amazonaws.services.s3.model.S3Object;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import java.io.IOException;
import java.io.InputStream;
@ -83,6 +83,7 @@ class S3RetryingInputStream extends InputStream {
private InputStream openStream() throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
getObjectRequest.setRequestMetricCollector(blobStore.getMetricCollector);
if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
assert start + currentOffset <= end :
"requesting beyond end, start = " + start + " offset=" + currentOffset + " end=" + end;

View File

@ -23,7 +23,9 @@ import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import fixture.s3.S3HttpHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.SuppressForbidden;
@ -31,6 +33,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -41,11 +44,14 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
@ -56,8 +62,13 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.StreamSupport;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.startsWith;
@ -108,12 +119,12 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket"));
return Collections.singletonMap("/bucket", new S3StatsHttpHandler(new S3BlobStoreHttpHandler("bucket")));
}
@Override
protected HttpHandler createErroneousHttpHandler(final HttpHandler delegate) {
return new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3));
return new S3StatsHttpHandler(new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3)));
}
@Override
@ -176,6 +187,81 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
assertThat(repository.threadPool().relativeTimeInNanos() - beforeFastDelete, lessThan(TEST_COOLDOWN_PERIOD.getNanos()));
}
public void testRequestStats() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final long nbDocs = randomLongBetween(100, 1000);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
final String snapshot = "snapshot";
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
.setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
final RepositoryStats repositoryStats = StreamSupport.stream(
internalCluster().getInstances(RepositoriesService.class).spliterator(), false)
.map(repositoriesService -> {
try {
return repositoriesService.repository(repository);
} catch (RepositoryMissingException e) {
return null;
}
})
.filter(r -> r != null)
.map(r -> r.stats())
.reduce((s1, s2) -> s1.merge(s2))
.get();
final long sdkGetCalls = repositoryStats.requestCounts.get("GET");
final long sdkListCalls = repositoryStats.requestCounts.get("LIST");
final long getCalls = handlers.values().stream()
.mapToLong(h -> {
while (h instanceof DelegatingHttpHandler) {
if (h instanceof S3StatsHttpHandler) {
return ((S3StatsHttpHandler) h).getCalls.get();
}
h = ((DelegatingHttpHandler) h).getDelegate();
}
assert false;
return 0L;
})
.sum();
final long listCalls = handlers.values().stream()
.mapToLong(h -> {
while (h instanceof DelegatingHttpHandler) {
if (h instanceof S3StatsHttpHandler) {
return ((S3StatsHttpHandler) h).listCalls.get();
}
h = ((DelegatingHttpHandler) h).getDelegate();
}
assert false;
return 0L;
})
.sum();
assertEquals("SDK sent " + sdkGetCalls + " GET calls and handler measured " + getCalls + " GET calls", getCalls, sdkGetCalls);
assertEquals("SDK sent " + sdkListCalls + " LIST calls and handler measured " + listCalls + " LIST calls", listCalls, sdkListCalls);
}
/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
@ -266,4 +352,33 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
return exchange.getRequestHeaders().getFirst(AmazonHttpClient.HEADER_SDK_TRANSACTION_ID);
}
}
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
private static class S3StatsHttpHandler implements DelegatingHttpHandler {
private final HttpHandler delegate;
public final AtomicLong getCalls = new AtomicLong();
public final AtomicLong listCalls = new AtomicLong();
S3StatsHttpHandler(final HttpHandler delegate) {
this.delegate = delegate;
}
@Override
public HttpHandler getDelegate() {
return delegate;
}
@Override
public void handle(HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
listCalls.incrementAndGet();
} else if (Regex.simpleMatch("GET /*/*", request)) {
getCalls.incrementAndGet();
}
delegate.handle(exchange);
}
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.blobstore;
import java.io.Closeable;
import java.util.Collections;
import java.util.Map;
/**
* An interface for storing blobs.
@ -29,4 +31,11 @@ public interface BlobStore extends Closeable {
* Get a blob container instance for storing blobs at the given {@link BlobPath}.
*/
BlobContainer blobContainer(BlobPath path);
/**
* Returns statistics on the count of operations that have been performed on this blob store
*/
default Map<String, Long> stats() {
return Collections.emptyMap();
}
}

View File

@ -167,6 +167,12 @@ public interface Repository extends LifecycleComponent {
*/
long getRestoreThrottleTimeInNanos();
/**
* Returns stats on the repository usage
*/
default RepositoryStats stats() {
return RepositoryStats.EMPTY_STATS;
}
/**
* Verifies repository on the master node and returns the verification token.

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class RepositoryStats implements Writeable {
public static final RepositoryStats EMPTY_STATS = new RepositoryStats(Collections.emptyMap());
public final Map<String, Long> requestCounts;
public RepositoryStats(Map<String, Long> requestCounts) {
this.requestCounts = Collections.unmodifiableMap(requestCounts);
}
public RepositoryStats(StreamInput in) throws IOException {
this.requestCounts = in.readMap(StreamInput::readString, StreamInput::readLong);
}
public RepositoryStats merge(RepositoryStats otherStats) {
final Map<String, Long> result = new HashMap<>();
result.putAll(requestCounts);
for (Map.Entry<String, Long> entry : otherStats.requestCounts.entrySet()) {
result.merge(entry.getKey(), entry.getValue(), Math::addExact);
}
return new RepositoryStats(result);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(requestCounts, StreamOutput::writeString, StreamOutput::writeLong);
}
}

View File

@ -104,6 +104,7 @@ import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.repositories.ShardGenerations;
@ -486,6 +487,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return metadata;
}
public RepositoryStats stats() {
final BlobStore store = blobStore.get();
if (store == null) {
return RepositoryStats.EMPTY_STATS;
}
return new RepositoryStats(store.stats());
}
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Metadata clusterMetadata) {
try {
@ -501,6 +510,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
if (isReadOnly()) {

View File

@ -43,6 +43,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -71,7 +72,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
private static final byte[] BUFFER = new byte[1024];
private static HttpServer httpServer;
private Map<String, HttpHandler> handlers;
protected Map<String, HttpHandler> handlers;
private static final Logger log = LogManager.getLogger();
@ -91,8 +92,9 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
@Before
public void setUpHttpServer() {
handlers = createHttpHandlers();
handlers.forEach((c, h) -> httpServer.createContext(c, wrap(randomBoolean() ? createErroneousHttpHandler(h) : h, logger)));
handlers = new HashMap<>(createHttpHandlers());
handlers.replaceAll((k, h) -> wrap(randomBoolean() ? createErroneousHttpHandler(h) : h, logger));
handlers.forEach(httpServer::createContext);
}
@AfterClass
@ -106,8 +108,12 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
if (handlers != null) {
for(Map.Entry<String, HttpHandler> handler : handlers.entrySet()) {
httpServer.removeContext(handler.getKey());
if (handler.getValue() instanceof BlobStoreHttpHandler) {
List<String> blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream()
HttpHandler h = handler.getValue();
while (h instanceof DelegatingHttpHandler) {
h = ((DelegatingHttpHandler) h).getDelegate();
}
if (h instanceof BlobStoreHttpHandler) {
List<String> blobs = ((BlobStoreHttpHandler) h).blobs().keySet().stream()
.filter(blob -> blob.contains("index") == false).collect(Collectors.toList());
assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0));
}
@ -172,11 +178,12 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
* slow down the test suite.
*/
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
protected abstract static class ErroneousHttpHandler implements HttpHandler {
protected abstract static class ErroneousHttpHandler implements DelegatingHttpHandler {
// first key is a unique identifier for the incoming HTTP request,
// value is the number of times the request has been seen
private final Map<String, AtomicInteger> requests;
private final HttpHandler delegate;
private final int maxErrorsPerRequest;
@ -227,13 +234,37 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
protected boolean canFailRequest(final HttpExchange exchange) {
return true;
}
public HttpHandler getDelegate() {
return delegate;
}
}
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
public interface DelegatingHttpHandler extends HttpHandler {
HttpHandler getDelegate();
}
/**
* Wrap a {@link HttpHandler} to log any thrown exception using the given {@link Logger}.
*/
public static HttpHandler wrap(final HttpHandler handler, final Logger logger) {
return exchange -> {
public static DelegatingHttpHandler wrap(final HttpHandler handler, final Logger logger) {
return new ExceptionCatchingHttpHandler(handler, logger);
}
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
private static class ExceptionCatchingHttpHandler implements DelegatingHttpHandler {
private final HttpHandler handler;
private final Logger logger;
ExceptionCatchingHttpHandler(HttpHandler handler, Logger logger) {
this.handler = handler;
this.logger = logger;
}
@Override
public void handle(HttpExchange exchange) throws IOException {
try {
handler.handle(exchange);
} catch (Throwable t) {
@ -241,6 +272,11 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
exchange.getRemoteAddress(), exchange.getRequestMethod(), exchange.getRequestURI()), t);
throw t;
}
};
}
@Override
public HttpHandler getDelegate() {
return handler;
}
}
}

View File

@ -0,0 +1,83 @@
---
setup:
- do:
indices.create:
index: docs
body:
settings:
number_of_shards: 1
number_of_replicas: 0
- do:
bulk:
body:
- index:
_index: docs
_id: 1
- field: foo
- index:
_index: docs
_id: 2
- field: bar
- index:
_index: docs
_id: 3
- field: baz
- do:
snapshot.create_repository:
repository: repository-fs
body:
type: fs
settings:
location: "repository-fs"
# Remove the snapshot if a previous test failed to delete it.
# Useful for third party tests that runs the test against a real external service.
- do:
snapshot.delete:
repository: repository-fs
snapshot: snapshot
ignore: 404
- do:
snapshot.create:
repository: repository-fs
snapshot: snapshot
wait_for_completion: true
- do:
indices.delete:
index: docs
---
teardown:
- do:
snapshot.delete:
repository: repository-fs
snapshot: snapshot
ignore: 404
- do:
snapshot.delete_repository:
repository: repository-fs
---
"Tests repository stats":
- skip:
version: " - 7.99.99"
reason: searchable snapshots introduced in 7.8.0 (8.0.0 currently, but adapt after backport to 7.x)
- do:
snapshot.restore:
repository: repository-fs
snapshot: snapshot
wait_for_completion: true
- do:
searchable_snapshots.repository_stats:
repository: repository-fs
- is_true: _all
- is_true: nodes

View File

@ -49,13 +49,16 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.action.RepositoryStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportRepositoryStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestRepositoryStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.rest.RestSearchableSnapshotsStatsAction;
import java.util.Collection;
@ -234,7 +237,8 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Rep
return org.elasticsearch.common.collect.List.of(
new ActionHandler<>(SearchableSnapshotsStatsAction.INSTANCE, TransportSearchableSnapshotsStatsAction.class),
new ActionHandler<>(ClearSearchableSnapshotsCacheAction.INSTANCE, TransportClearSearchableSnapshotsCacheAction.class),
new ActionHandler<>(MountSearchableSnapshotAction.INSTANCE, TransportMountSearchableSnapshotAction.class)
new ActionHandler<>(MountSearchableSnapshotAction.INSTANCE, TransportMountSearchableSnapshotAction.class),
new ActionHandler<>(RepositoryStatsAction.INSTANCE, TransportRepositoryStatsAction.class)
);
} else {
return org.elasticsearch.common.collect.List.of();
@ -254,7 +258,8 @@ public class SearchableSnapshots extends Plugin implements IndexStorePlugin, Rep
return org.elasticsearch.common.collect.List.of(
new RestSearchableSnapshotsStatsAction(),
new RestClearSearchableSnapshotsCacheAction(),
new RestMountSearchableSnapshotAction()
new RestMountSearchableSnapshotAction(),
new RestRepositoryStatsAction()
);
} else {
return org.elasticsearch.common.collect.List.of();

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.action;
import org.elasticsearch.action.ActionType;
public class RepositoryStatsAction extends ActionType<RepositoryStatsResponse> {
public static final RepositoryStatsAction INSTANCE = new RepositoryStatsAction();
public static final String NAME = "cluster:admin/repository/stats";
private RepositoryStatsAction() {
super(NAME, RepositoryStatsResponse::new);
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.action;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class RepositoryStatsNodeRequest extends BaseNodeRequest {
private final String repository;
public RepositoryStatsNodeRequest(String repository) {
this.repository = repository;
}
public RepositoryStatsNodeRequest(StreamInput in) throws IOException {
super(in);
this.repository = in.readString();
}
public String getRepository() {
return repository;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.action;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.RepositoryStats;
import java.io.IOException;
public class RepositoryStatsNodeResponse extends BaseNodeResponse implements ToXContentObject {
private final RepositoryStats repositoryStats;
public RepositoryStatsNodeResponse(StreamInput in) throws IOException {
super(in);
repositoryStats = new RepositoryStats(in);
}
public RepositoryStatsNodeResponse(DiscoveryNode node, RepositoryStats repositoryStats) {
super(node);
this.repositoryStats = repositoryStats;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
repositoryStats.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (repositoryStats.requestCounts.isEmpty() == false) {
builder.field("stats", repositoryStats.requestCounts);
}
builder.endObject();
return builder;
}
public RepositoryStats getRepositoryStats() {
return repositoryStats;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.action;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class RepositoryStatsRequest extends BaseNodesRequest<RepositoryStatsRequest> {
private final String repository;
public RepositoryStatsRequest(StreamInput in) throws IOException {
super(in);
repository = in.readString();
}
public RepositoryStatsRequest(String repository, String... nodesIds) {
super(nodesIds);
this.repository = repository;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(repository);
}
public String getRepository() {
return repository;
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.RepositoryStats;
import java.io.IOException;
import java.util.List;
public class RepositoryStatsResponse extends BaseNodesResponse<RepositoryStatsNodeResponse> implements ToXContentObject {
private final RepositoryStats globalStats;
public RepositoryStatsResponse(ClusterName clusterName, List<RepositoryStatsNodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
globalStats = computeGlobalStats(getNodes());
}
public RepositoryStatsResponse(StreamInput in) throws IOException {
super(in);
globalStats = computeGlobalStats(getNodes());
}
private static RepositoryStats computeGlobalStats(List<RepositoryStatsNodeResponse> nodes) {
if (nodes.isEmpty()) {
return RepositoryStats.EMPTY_STATS;
} else {
return nodes.stream().map(RepositoryStatsNodeResponse::getRepositoryStats).reduce(RepositoryStats::merge).get();
}
}
@Override
protected List<RepositoryStatsNodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(RepositoryStatsNodeResponse::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<RepositoryStatsNodeResponse> nodes) throws IOException {
out.writeList(nodes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("_all", globalStats.requestCounts);
builder.startArray("nodes");
for (RepositoryStatsNodeResponse node : getNodes()) {
node.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class TransportRepositoryStatsAction extends TransportNodesAction<
RepositoryStatsRequest,
RepositoryStatsResponse,
RepositoryStatsNodeRequest,
RepositoryStatsNodeResponse> {
private final RepositoriesService repositoriesService;
private final XPackLicenseState licenseState;
@Inject
public TransportRepositoryStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
RepositoriesService repositoriesService,
XPackLicenseState licenseState
) {
super(
RepositoryStatsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
RepositoryStatsRequest::new,
RepositoryStatsNodeRequest::new,
ThreadPool.Names.SAME,
RepositoryStatsNodeResponse.class
);
this.repositoriesService = repositoriesService;
this.licenseState = Objects.requireNonNull(licenseState);
}
@Override
protected RepositoryStatsResponse newResponse(
RepositoryStatsRequest request,
List<RepositoryStatsNodeResponse> nodes,
List<FailedNodeException> failures
) {
return new RepositoryStatsResponse(clusterService.getClusterName(), nodes, failures);
}
@Override
protected RepositoryStatsNodeRequest newNodeRequest(RepositoryStatsRequest request) {
return new RepositoryStatsNodeRequest(request.getRepository());
}
@Override
protected RepositoryStatsNodeResponse newNodeResponse(StreamInput in) throws IOException {
return new RepositoryStatsNodeResponse(in);
}
@Override
protected RepositoryStatsNodeResponse nodeOperation(RepositoryStatsNodeRequest request) {
SearchableSnapshots.ensureValidLicense(licenseState);
if (clusterService.localNode().isMasterNode() == false && clusterService.localNode().isDataNode() == false) {
return new RepositoryStatsNodeResponse(clusterService.localNode(), RepositoryStats.EMPTY_STATS);
}
final Repository repository = repositoriesService.repository(request.getRepository());
return new RepositoryStatsNodeResponse(clusterService.localNode(), repository.stats());
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.searchablesnapshots.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.searchablesnapshots.action.RepositoryStatsAction;
import org.elasticsearch.xpack.searchablesnapshots.action.RepositoryStatsRequest;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.rest.RestRequest.Method.GET;
public class RestRepositoryStatsAction extends BaseRestHandler {
@Override
public String getName() {
return "repository_stats_action";
}
@Override
public List<RestHandler.Route> routes() {
return Collections.singletonList(new RestHandler.Route(GET, "/_snapshot/{repository}/_stats"));
}
@Override
protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
final RepositoryStatsRequest repositoryStatsRequest = new RepositoryStatsRequest(request.param("repository"));
return channel -> client.execute(RepositoryStatsAction.INSTANCE, repositoryStatsRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,24 @@
{
"searchable_snapshots.repository_stats": {
"documentation": {
"url": "https://www.elastic.co/guide/en/elasticsearch/reference/master/searchable-snapshots-repository-stats.html"
},
"stability": "experimental",
"url": {
"paths": [
{
"path": "/_snapshot/{repository}/_stats",
"methods": [
"GET"
],
"parts": {
"repository": {
"type": "string",
"description": "The repository for which to get the stats for"
}
}
}
]
}
}
}