Add repositories metering API (#62088)

This pull request adds a new set of APIs that allows tracking the number of requests performed
by the different registered repositories.

In order to avoid losing data, the repository statistics are archived after the repository is closed for
a configurable retention period `repositories.stats.archive.retention_period`. The API exposes the
statistics for the active repositories as well as the modified/closed repositories.

Backport of #60371
This commit is contained in:
Francisco Fernández Castaño 2020-09-08 14:01:04 +02:00 committed by GitHub
parent fb6ee5b36d
commit 2bb5716b3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 2712 additions and 141 deletions

View File

@ -0,0 +1,35 @@
[role="xpack"]
[testenv="basic"]
[[clear-repositories-metering-archive-api]]
=== Clear repositories metering archive
++++
<titleabbrev>Clear repositories metering archive</titleabbrev>
++++
Removes the archived repositories metering information present in the cluster.
[[clear-repositories-metering-archive-api-request]]
==== {api-request-title}
`DELETE /_nodes/<node_id>/_repositories_metering/<max_version_to_clear>`
[[clear-repositories-metering-archive-api-desc]]
==== {api-description-title}
You can use this API to clear the archived repositories metering information in the cluster.
[[clear-repositories-metering-archive-api-path-params]]
==== {api-path-parms-title}
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=node-id]
`<max_version_to_clear>`::
(long) Specifies the maximum <<get-repositories-metering-api-response-body, archive_version>> to be cleared from the archive.
All the nodes selective options are explained <<cluster-nodes,here>>.
[role="child_attributes"]
[[clear-repositories-metering-archive-api-response-body]]
==== {api-response-body-title}
Returns the deleted archived repositories metering information.
include::{es-repo-dir}/repositories-metering-api/apis/repositories-meterings-body.asciidoc[tag=repositories-metering-body]

View File

@ -0,0 +1,35 @@
[role="xpack"]
[testenv="basic"]
[[get-repositories-metering-api]]
=== Get repositories metering information
++++
<titleabbrev>Get repositories metering information</titleabbrev>
++++
Returns cluster repositories metering information.
[[get-repositories-metering-api-request]]
==== {api-request-title}
`GET /_nodes/<node_id>/_repositories_metering`
[[get-repositories-metering-api-desc]]
==== {api-description-title}
You can use the cluster repositories metering API to retrieve repositories metering information in a cluster.
This API exposes monotonically non-decreasing counters and it's expected that clients would durably store
the information needed to compute aggregations over a period of time. Additionally, the information
exposed by this API is volatile, meaning that it won't be present after node restarts.
[[get-repositories-metering-api-path-params]]
==== {api-path-parms-title}
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=node-id]
All the nodes selective options are explained <<cluster-nodes,here>>.
[role="child_attributes"]
[[get-repositories-metering-api-response-body]]
==== {api-response-body-title}
include::{es-repo-dir}/repositories-metering-api/apis/repositories-meterings-body.asciidoc[tag=repositories-metering-body]

View File

@ -0,0 +1,178 @@
tag::repositories-metering-body[]
`_nodes`::
(object)
Contains statistics about the number of nodes selected by the request.
+
.Properties of `_nodes`
[%collapsible%open]
====
`total`::
(integer)
Total number of nodes selected by the request.
`successful`::
(integer)
Number of nodes that responded successfully to the request.
`failed`::
(integer)
Number of nodes that rejected the request or failed to respond. If this value
is not `0`, a reason for the rejection or failure is included in the response.
====
`cluster_name`::
(string)
Name of the cluster. Based on the <<cluster.name>> setting.
`nodes`::
(object)
Contains repositories metering information for the nodes selected by the request.
+
.Properties of `nodes`
[%collapsible%open]
====
`<node_id>`::
(array)
An array of repository metering information for the node.
+
.Properties of objects in `node_id`
[%collapsible%open]
=====
`repository_name`::
(string)
Repository name.
`repository_type`::
(string)
Repository type.
`repository_location`::
(object)
Represents an unique location within the repository.
+
.Properties of `repository_location` for repository type `Azure`
[%collapsible%open]
======
`base_path`::
(string)
The path within the container where the repository stores data.
`container`::
(string)
Container name.
======
+
.Properties of `repository_location` for repository type `GCP`
[%collapsible%open]
======
`base_path`::
(string)
The path within the bucket where the repository stores data.
`bucket`::
(string)
Bucket name.
======
+
.Properties of `repository_location` for repository type `S3`
[%collapsible%open]
======
`base_path`::
(string)
The path within the bucket where the repository stores data.
`bucket`::
(string)
Bucket name.
======
`repository_ephemeral_id`::
(string)
An identifier that changes every time the repository is updated.
`repository_started_at`::
(long)
Time the repository was created or updated. Recorded in milliseconds
since the https://en.wikipedia.org/wiki/Unix_time[Unix Epoch].
`repository_stopped_at`::
(Optional, long)
Time the repository was deleted or updated. Recorded in milliseconds
since the https://en.wikipedia.org/wiki/Unix_time[Unix Epoch].
`archived`::
(boolean)
A flag that tells whether or not this object has been archived.
When a repository is closed or updated the repository metering information
is archived and kept for a certain period of time. This allows retrieving
the repository metering information of previous repository instantiations.
`archive_version`::
(Optional, long)
The cluster state version when this object was archived, this field
can be used as a logical timestamp to delete all the archived metrics up
to an observed version. This field is only present for archived
repository metering information objects. The main purpose of this
field is to avoid possible race conditions during repository metering
information deletions, i.e. deleting archived repositories metering
information that we haven't observed yet.
`request_counts`::
(object)
An object with the number of request performed against the repository
grouped by request type.
+
.Properties of `request_counts` for repository type `Azure`
[%collapsible%open]
======
`GetBlobProperties`::
(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob-properties[Get Blob Properties] requests.
`GetBlob`::
(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/get-blob[Get Blob] requests.
`ListBlobs`::
(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs[List Blobs] requests.
`PutBlob`::
(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob[Put Blob] requests.
`PutBlock`::
(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-block[Put Block].
`PutBlockList`::
(long) Number of https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list[Put Block List] requests.
Azure storage https://azure.microsoft.com/en-us/pricing/details/storage/blobs/[pricing].
======
+
.Properties of `request_counts` for repository type `GCP`
[%collapsible%open]
======
`GetObject`::
(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/get[get object] requests.
`ListObjects`::
(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/list[list objects] requests.
`InsertObject`::
(long) Number of https://cloud.google.com/storage/docs/json_api/v1/objects/insert[insert object] requests,
including https://cloud.google.com/storage/docs/uploading-objects[simple], https://cloud.google.com/storage/docs/json_api/v1/how-tos/multipart-upload[multipart] and
https://cloud.google.com/storage/docs/resumable-uploads[resumable] uploads. Resumable uploads can perform multiple http requests to
insert a single object but they are considered as a single request since they are https://cloud.google.com/storage/docs/resumable-uploads#introduction[billed] as an individual operation.
Google Cloud storage https://cloud.google.com/storage/pricing[pricing].
======
+
.Properties of `request_counts` for repository type `S3`
[%collapsible%open]
======
`GetObject`::
(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html[GetObject] requests.
`ListObjects`::
(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html[ListObjects] requests.
`PutObject`::
(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html[PutObject] requests.
`PutMultipartObject`::
(long) Number of https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html[Multipart] requests,
including https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html[CreateMultipartUpload],
https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPart.html[UploadPart] and https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html[CompleteMultipartUpload]
requests.
Amazon Web Services Simple Storage Service https://aws.amazon.com/s3/pricing/[pricing].
======
=====
====
end::repositories-metering-body[]

View File

@ -0,0 +1,16 @@
[role="xpack"]
[testenv="basic"]
[[repositories-metering-apis]]
== Repositories metering APIs
experimental[]
You can use the following APIs to retrieve repositories metering information.
This is an API used by Elastic's commercial offerings.
* <<get-repositories-metering-api,Get repositories metering information>>
* <<clear-repositories-metering-archive-api,Clear repositories metering archive>>
include::apis/get-repositories-metering.asciidoc[]
include::apis/clear-repositories-metering-archive.asciidoc[]

View File

@ -30,6 +30,7 @@ endif::[]
* <<ml-df-analytics-apis,{ml-cap} {dfanalytics} APIs>>
* <<migration-api,Migration APIs>>
* <<indices-reload-analyzers,Reload Search Analyzers API>>
* <<repositories-metering-apis,Repositories Metering APIs>>
* <<rollup-apis,Rollup APIs>>
* <<search, Search APIs>>
ifdef::permanently-unreleased-branch[]
@ -63,6 +64,7 @@ include::{es-repo-dir}/ml/anomaly-detection/apis/index.asciidoc[]
include::{es-repo-dir}/ml/df-analytics/apis/index.asciidoc[]
include::{es-repo-dir}/migration/migration.asciidoc[]
include::{es-repo-dir}/indices/apis/reload-analyzers.asciidoc[]
include::{es-repo-dir}/repositories-metering-api/repositories-metering-apis.asciidoc[]
include::{es-repo-dir}/rollup/rollup-api.asciidoc[]
include::{es-repo-dir}/search.asciidoc[]
ifdef::permanently-unreleased-branch[]

View File

@ -40,7 +40,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@ -76,11 +75,6 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
return new AzureErroneousHttpHandler(delegate, randomIntBetween(2, 3));
}
@Override
protected List<String> requestTypesTracked() {
return org.elasticsearch.common.collect.List.of("GET", "LIST", "HEAD", "PUT", "PUT_BLOCK");
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
final String key = Base64.getEncoder().encodeToString(randomAlphaOfLength(10).getBytes(StandardCharsets.UTF_8));
@ -180,23 +174,28 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
@Override
protected void maybeTrack(String request, Headers headers) {
if (Regex.simpleMatch("GET /*/*", request)) {
trackRequest("GET");
trackRequest("GetBlob");
} else if (Regex.simpleMatch("HEAD /*/*", request)) {
trackRequest("HEAD");
trackRequest("GetBlobProperties");
} else if (listPattern.matcher(request).matches()) {
trackRequest("LIST");
} else if (isBlockUpload(request)) {
trackRequest("PUT_BLOCK");
trackRequest("ListBlobs");
} else if (isPutBlock(request)) {
trackRequest("PutBlock");
} else if (isPutBlockList(request)) {
trackRequest("PutBlockList");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PUT");
trackRequest("PutBlob");
}
}
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
private boolean isBlockUpload(String request) {
return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request)
|| (Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid="));
private boolean isPutBlock(String request) {
return Regex.simpleMatch("PUT /*/*?*comp=block*", request) && request.contains("blockid=");
}
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
private boolean isPutBlockList(String request) {
return Regex.simpleMatch("PUT /*/*?*comp=blocklist*", request);
}
}
}

View File

@ -120,21 +120,21 @@ public class AzureBlobStore implements BlobStore {
this.uploadMetricsCollector = (httpURLConnection -> {
assert httpURLConnection.getRequestMethod().equals("PUT");
String queryParams = httpURLConnection.getURL().getQuery();
if (queryParams != null && isBlockUpload(queryParams)) {
stats.putBlockOperations.incrementAndGet();
} else {
if (queryParams == null) {
stats.putOperations.incrementAndGet();
return;
}
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
if (queryParams.contains("comp=block") && queryParams.contains("blockid=")) {
stats.putBlockOperations.incrementAndGet();
} else if (queryParams.contains("comp=blocklist")) {
stats.putBlockListOperations.incrementAndGet();
}
});
}
private boolean isBlockUpload(String queryParams) {
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
return (queryParams.contains("comp=block") && queryParams.contains("blockid="))
|| queryParams.contains("comp=blocklist");
}
@Override
public String toString() {
return container;
@ -385,14 +385,15 @@ public class AzureBlobStore implements BlobStore {
private final AtomicLong putBlockOperations = new AtomicLong();
private final AtomicLong putBlockListOperations = new AtomicLong();
private Map<String, Long> toMap() {
return org.elasticsearch.common.collect.Map.of(
"GET", getOperations.get(),
"LIST", listOperations.get(),
"HEAD", headOperations.get(),
"PUT", putOperations.get(),
"PUT_BLOCK", putBlockOperations.get()
);
return org.elasticsearch.common.collect.Map.of("GetBlob", getOperations.get(),
"ListBlobs", listOperations.get(),
"GetBlobProperties", headOperations.get(),
"PutBlob", putOperations.get(),
"PutBlock", putBlockOperations.get(),
"PutBlockList", putBlockListOperations.get());
}
}
}

View File

@ -33,9 +33,10 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.repositories.azure.AzureStorageService.MAX_CHUNK_SIZE;
@ -52,7 +53,7 @@ import static org.elasticsearch.repositories.azure.AzureStorageService.MIN_CHUNK
* <dt>{@code compress}</dt><dd>If set to true metadata files will be stored compressed. Defaults to false.</dd>
* </dl>
*/
public class AzureRepository extends BlobStoreRepository {
public class AzureRepository extends MeteredBlobStoreRepository {
private static final Logger logger = LogManager.getLogger(AzureRepository.class);
public static final String TYPE = "azure";
@ -85,7 +86,7 @@ public class AzureRepository extends BlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
super(metadata, Repository.COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService,
recoverySettings);
recoverySettings, buildLocation(metadata));
this.chunkSize = Repository.CHUNK_SIZE_SETTING.get(metadata.settings());
this.storageService = storageService;
@ -111,6 +112,11 @@ public class AzureRepository extends BlobStoreRepository {
}
}
private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
return org.elasticsearch.common.collect.Map.of("base_path", Repository.BASE_PATH_SETTING.get(metadata.settings()),
"container", Repository.CONTAINER_SETTING.get(metadata.settings()));
}
@Override
protected BlobStore getBlobStore() {
return super.getBlobStore();

View File

@ -62,7 +62,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -120,11 +119,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
return new GoogleErroneousHttpHandler(delegate, randomIntBetween(2, 3));
}
@Override
protected List<String> requestTypesTracked() {
return org.elasticsearch.common.collect.List.of("GET", "LIST", "POST", "PUT");
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
final Settings.Builder settings = Settings.builder();
@ -325,15 +319,18 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
@Override
public void maybeTrack(final String request, Headers requestHeaders) {
if (Regex.simpleMatch("GET /storage/v1/b/*/o/*", request)) {
trackRequest("GET");
trackRequest("GetObject");
} else if (Regex.simpleMatch("GET /storage/v1/b/*/o*", request)) {
trackRequest("LIST");
trackRequest("ListObjects");
} else if (Regex.simpleMatch("GET /download/storage/v1/b/*", request)) {
trackRequest("GET");
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*", request) && isLastPart(requestHeaders)) {
trackRequest("PUT");
trackRequest("GetObject");
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/*uploadType=resumable*", request) && isLastPart(requestHeaders)) {
// Resumable uploads are billed as a single operation, that's the reason we're tracking
// the request only when it's the last part.
// See https://cloud.google.com/storage/docs/resumable-uploads#introduction
trackRequest("InsertObject");
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/*uploadType=multipart*", request)) {
trackRequest("POST");
trackRequest("InsertObject");
}
}

View File

@ -58,10 +58,9 @@ final class GoogleCloudStorageOperationsStats {
Map<String, Long> toMap() {
final Map<String, Long> results = new HashMap<>();
results.put("GET", getCount.get());
results.put("LIST", listCount.get());
results.put("PUT", putCount.get());
results.put("POST", postCount.get());
results.put("GetObject", getCount.get());
results.put("ListObjects", listCount.get());
results.put("InsertObject", postCount.get() + putCount.get());
return results;
}
}

View File

@ -31,8 +31,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.common.settings.Setting.Property;
@ -40,7 +41,7 @@ import static org.elasticsearch.common.settings.Setting.boolSetting;
import static org.elasticsearch.common.settings.Setting.byteSizeSetting;
import static org.elasticsearch.common.settings.Setting.simpleString;
class GoogleCloudStorageRepository extends BlobStoreRepository {
class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRepository.class);
// package private for testing
@ -76,7 +77,7 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
final GoogleCloudStorageService storageService,
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings);
super(metadata, getSetting(COMPRESS, metadata), namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.storageService = storageService;
String basePath = BASE_PATH.get(metadata.settings());
@ -96,6 +97,11 @@ class GoogleCloudStorageRepository extends BlobStoreRepository {
logger.debug("using bucket [{}], base_path [{}], chunk_size [{}], compress [{}]", bucket, basePath, chunkSize, isCompress());
}
private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
return org.elasticsearch.common.collect.Map.of("base_path", BASE_PATH.get(metadata.settings()),
"bucket", getSetting(BUCKET, metadata));
}
@Override
protected GoogleCloudStorageBlobStore createBlobStore() {
return new GoogleCloudStorageBlobStore(bucket, clientName, metadata.name(), storageService, bufferSize);

View File

@ -119,11 +119,6 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
return new S3StatsCollectorHttpHandler(new S3ErroneousHttpHandler(delegate, randomIntBetween(2, 3)));
}
@Override
protected List<String> requestTypesTracked() {
return org.elasticsearch.common.collect.List.of("GET", "LIST", "POST", "PUT");
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
final MockSecureSettings secureSettings = new MockSecureSettings();
@ -287,13 +282,13 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
@Override
public void maybeTrack(final String request, Headers requestHeaders) {
if (Regex.simpleMatch("GET /*/?prefix=*", request)) {
trackRequest("LIST");
trackRequest("ListObjects");
} else if (Regex.simpleMatch("GET /*/*", request)) {
trackRequest("GET");
trackRequest("GetObject");
} else if (isMultiPartUpload(request)) {
trackRequest("POST");
trackRequest("PutMultipartObject");
} else if (Regex.simpleMatch("PUT /*/*", request)) {
trackRequest("PUT");
trackRequest("PutObject");
}
}

View File

@ -210,10 +210,10 @@ class S3BlobStore implements BlobStore {
Map<String, Long> toMap() {
final Map<String, Long> results = new HashMap<>();
results.put("GET", getCount.get());
results.put("LIST", listCount.get());
results.put("PUT", putCount.get());
results.put("POST", postCount.get());
results.put("GetObject", getCount.get());
results.put("ListObjects", listCount.get());
results.put("PutObject", putCount.get());
results.put("PutMultipartObject", postCount.get());
return results;
}
}

View File

@ -44,7 +44,7 @@ import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotsService;
@ -52,6 +52,7 @@ import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@ -69,7 +70,7 @@ import java.util.function.Function;
* <dt>{@code compress}</dt><dd>If set to true metadata files will be stored compressed. Defaults to false.</dd>
* </dl>
*/
class S3Repository extends BlobStoreRepository {
class S3Repository extends MeteredBlobStoreRepository {
private static final Logger logger = LogManager.getLogger(S3Repository.class);
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(logger.getName());
@ -214,7 +215,12 @@ class S3Repository extends BlobStoreRepository {
final S3Service service,
final ClusterService clusterService,
final RecoverySettings recoverySettings) {
super(metadata, COMPRESS_SETTING.get(metadata.settings()), namedXContentRegistry, clusterService, recoverySettings);
super(metadata,
COMPRESS_SETTING.get(metadata.settings()),
namedXContentRegistry,
clusterService,
recoverySettings,
buildLocation(metadata));
this.service = service;
this.repositoryMetadata = metadata;
@ -265,6 +271,11 @@ class S3Repository extends BlobStoreRepository {
storageClass);
}
private static Map<String, String> buildLocation(RepositoryMetadata metadata) {
return org.elasticsearch.common.collect.Map.of("base_path", BASE_PATH_SETTING.get(metadata.settings()),
"bucket", BUCKET_SETTING.get(metadata.settings()));
}
/**
* Holds a reference to delayed repository operation {@link Scheduler.Cancellable} so it can be cancelled should the repository be
* closed concurrently.

View File

@ -44,9 +44,12 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -57,6 +60,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Set;
/**
@ -66,6 +71,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
private static final Logger logger = LogManager.getLogger(RepositoriesService.class);
public static final Setting<TimeValue> REPOSITORIES_STATS_ARCHIVE_RETENTION_PERIOD =
Setting.positiveTimeSetting("repositories.stats.archive.retention_period", TimeValue.timeValueHours(2), Setting.Property.NodeScope);
public static final Setting<Integer> REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS =
Setting.intSetting("repositories.stats.archive.max_archived_stats", 100, 0, Setting.Property.NodeScope);
private final Map<String, Repository.Factory> typesRegistry;
private final Map<String, Repository.Factory> internalTypesRegistry;
@ -77,6 +88,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
private final Map<String, Repository> internalRepositories = ConcurrentCollections.newConcurrentMap();
private volatile Map<String, Repository> repositories = Collections.emptyMap();
private final RepositoriesStatsArchive repositoriesStatsArchive;
public RepositoriesService(Settings settings, ClusterService clusterService, TransportService transportService,
Map<String, Repository.Factory> typesRegistry, Map<String, Repository.Factory> internalTypesRegistry,
@ -93,6 +106,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
}
this.verifyAction = new VerifyNodeRepositoryAction(transportService, clusterService, this);
this.repositoriesStatsArchive = new RepositoriesStatsArchive(REPOSITORIES_STATS_ARCHIVE_RETENTION_PERIOD.get(settings),
REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS.get(settings),
threadPool::relativeTimeInMillis);
}
/**
@ -316,7 +332,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
for (Map.Entry<String, Repository> entry : repositories.entrySet()) {
if (newMetadata == null || newMetadata.repository(entry.getKey()) == null) {
logger.debug("unregistering repository [{}]", entry.getKey());
closeRepository(entry.getValue());
Repository repository = entry.getValue();
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
} else {
survivors.put(entry.getKey(), entry.getValue());
}
@ -335,6 +353,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
// Previous version is different from the version in settings
logger.debug("updating repository [{}]", repositoryMetadata.name());
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
repository = null;
try {
repository = createRepository(repositoryMetadata, typesRegistry);
@ -405,6 +424,27 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
throw new RepositoryMissingException(repositoryName);
}
public List<RepositoryStatsSnapshot> repositoriesStats() {
List<RepositoryStatsSnapshot> archivedRepoStats = repositoriesStatsArchive.getArchivedStats();
List<RepositoryStatsSnapshot> activeRepoStats = getRepositoryStatsForActiveRepositories();
List<RepositoryStatsSnapshot> repositoriesStats = new ArrayList<>(archivedRepoStats);
repositoriesStats.addAll(activeRepoStats);
return repositoriesStats;
}
private List<RepositoryStatsSnapshot> getRepositoryStatsForActiveRepositories() {
return Stream.concat(repositories.values().stream(), internalRepositories.values().stream())
.filter(r -> r instanceof MeteredBlobStoreRepository)
.map(r -> (MeteredBlobStoreRepository) r)
.map(MeteredBlobStoreRepository::statsSnapshot)
.collect(Collectors.toList());
}
public List<RepositoryStatsSnapshot> clearRepositoriesStatsArchive(long maxVersionToClear) {
return repositoriesStatsArchive.clear(maxVersionToClear);
}
public void registerInternalRepository(String name, String type) {
RepositoryMetadata metadata = new RepositoryMetadata(name, type, Settings.EMPTY);
Repository repository = internalRepositories.computeIfAbsent(name, (n) -> {
@ -435,6 +475,15 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
repository.close();
}
private void archiveRepositoryStats(Repository repository, long clusterStateVersion) {
if (repository instanceof MeteredBlobStoreRepository) {
RepositoryStatsSnapshot stats = ((MeteredBlobStoreRepository) repository).statsSnapshotForArchival(clusterStateVersion);
if (repositoriesStatsArchive.archive(stats) == false) {
logger.warn("Unable to archive the repository stats [{}] as the archive is full.", stats);
}
}
}
/**
* Creates repository holder. This method starts the repository
*/

View File

@ -0,0 +1,121 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.unit.TimeValue;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
public final class RepositoriesStatsArchive {
private static final Logger logger = LogManager.getLogger(RepositoriesStatsArchive.class);
private final TimeValue retentionPeriod;
private final int maxCapacity;
private final LongSupplier relativeTimeSupplier;
private final Deque<ArchiveEntry> archive = new ArrayDeque<>();
public RepositoriesStatsArchive(TimeValue retentionPeriod,
int maxCapacity,
LongSupplier relativeTimeSupplier) {
this.retentionPeriod = retentionPeriod;
this.maxCapacity = maxCapacity;
this.relativeTimeSupplier = relativeTimeSupplier;
}
/**
* Archives the specified repository stats snapshot into the archive
* if it's possible without violating the capacity constraints.
*
* @return {@code true} if the repository stats were archived, {@code false} otherwise.
*/
synchronized boolean archive(final RepositoryStatsSnapshot repositoryStats) {
assert containsRepositoryStats(repositoryStats) == false
: "A repository with ephemeral id " + repositoryStats.getRepositoryInfo().ephemeralId + " is already archived";
assert repositoryStats.isArchived();
evict();
if (archive.size() >= maxCapacity) {
return false;
}
return archive.add(new ArchiveEntry(repositoryStats, relativeTimeSupplier.getAsLong()));
}
synchronized List<RepositoryStatsSnapshot> getArchivedStats() {
evict();
return archive.stream().map(e -> e.repositoryStatsSnapshot).collect(Collectors.toList());
}
/**
* Clears the archive, returning the valid archived entries up until that point.
*
* @return the repository stats that were stored before clearing the archive.
*/
synchronized List<RepositoryStatsSnapshot> clear(long maxVersionToClear) {
List<RepositoryStatsSnapshot> clearedStats = new ArrayList<>();
Iterator<ArchiveEntry> iterator = archive.iterator();
while (iterator.hasNext()) {
RepositoryStatsSnapshot statsSnapshot = iterator.next().repositoryStatsSnapshot;
if (statsSnapshot.getClusterVersion() <= maxVersionToClear) {
clearedStats.add(statsSnapshot);
iterator.remove();
}
}
logger.debug("RepositoriesStatsArchive have been cleared. Removed stats: [{}]", clearedStats);
return clearedStats;
}
private void evict() {
ArchiveEntry entry;
while ((entry = archive.peek()) != null && entry.ageInMillis(relativeTimeSupplier) >= retentionPeriod.getMillis()) {
ArchiveEntry removedEntry = archive.poll();
logger.debug("Evicting repository stats [{}]", removedEntry.repositoryStatsSnapshot);
}
}
private boolean containsRepositoryStats(RepositoryStatsSnapshot repositoryStats) {
return archive.stream()
.anyMatch(entry ->
entry.repositoryStatsSnapshot.getRepositoryInfo().ephemeralId.equals(repositoryStats.getRepositoryInfo().ephemeralId));
}
private static class ArchiveEntry {
private final RepositoryStatsSnapshot repositoryStatsSnapshot;
private final long createdAtMillis;
private ArchiveEntry(RepositoryStatsSnapshot repositoryStatsSnapshot, long createdAtMillis) {
this.repositoryStatsSnapshot = repositoryStatsSnapshot;
this.createdAtMillis = createdAtMillis;
}
private long ageInMillis(LongSupplier relativeTimeInMillis) {
return Math.max(0, relativeTimeInMillis.getAsLong() - createdAtMillis);
}
}
}

View File

@ -0,0 +1,132 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public final class RepositoryInfo implements Writeable, ToXContentFragment {
public final String ephemeralId;
public final String name;
public final String type;
public final Map<String, String> location;
public final long startedAt;
@Nullable
public final Long stoppedAt;
public RepositoryInfo(String ephemeralId,
String name,
String type,
Map<String, String> location,
long startedAt) {
this(ephemeralId, name, type, location, startedAt, null);
}
public RepositoryInfo(String ephemeralId,
String name,
String type,
Map<String, String> location,
long startedAt,
@Nullable Long stoppedAt) {
this.ephemeralId = ephemeralId;
this.name = name;
this.type = type;
this.location = location;
this.startedAt = startedAt;
if (stoppedAt != null && startedAt > stoppedAt) {
throw new IllegalArgumentException("createdAt must be before or equal to stoppedAt");
}
this.stoppedAt = stoppedAt;
}
public RepositoryInfo(StreamInput in) throws IOException {
this.ephemeralId = in.readString();
this.name = in.readString();
this.type = in.readString();
this.location = in.readMap(StreamInput::readString, StreamInput::readString);
this.startedAt = in.readLong();
this.stoppedAt = in.readOptionalLong();
}
public RepositoryInfo stopped(long stoppedAt) {
assert isStopped() == false : "The repository is already stopped";
return new RepositoryInfo(ephemeralId, name, type, location, startedAt, stoppedAt);
}
public boolean isStopped() {
return stoppedAt != null;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(ephemeralId);
out.writeString(name);
out.writeString(type);
out.writeMap(location, StreamOutput::writeString, StreamOutput::writeString);
out.writeLong(startedAt);
out.writeOptionalLong(stoppedAt);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("repository_name", name);
builder.field("repository_type", type);
builder.field("repository_location", location);
builder.field("repository_ephemeral_id", ephemeralId);
builder.field("repository_started_at", startedAt);
if (stoppedAt != null) {
builder.field("repository_stopped_at", stoppedAt);
}
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RepositoryInfo that = (RepositoryInfo) o;
return ephemeralId.equals(that.ephemeralId) &&
name.equals(that.name) &&
type.equals(that.type) &&
location.equals(that.location) &&
startedAt == that.startedAt &&
Objects.equals(stoppedAt, that.stoppedAt);
}
@Override
public int hashCode() {
return Objects.hash(ephemeralId, name, type, location, startedAt, stoppedAt);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class RepositoryStats implements Writeable {
@ -55,4 +56,24 @@ public class RepositoryStats implements Writeable {
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(requestCounts, StreamOutput::writeString, StreamOutput::writeLong);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RepositoryStats that = (RepositoryStats) o;
return requestCounts.equals(that.requestCounts);
}
@Override
public int hashCode() {
return Objects.hash(requestCounts);
}
@Override
public String toString() {
return "RepositoryStats{" +
"requestCounts=" + requestCounts +
'}';
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public final class RepositoryStatsSnapshot implements Writeable, ToXContentObject {
public static final long UNKNOWN_CLUSTER_VERSION = -1;
private final RepositoryInfo repositoryInfo;
private final RepositoryStats repositoryStats;
private final long clusterVersion;
private final boolean archived;
public RepositoryStatsSnapshot(RepositoryInfo repositoryInfo,
RepositoryStats repositoryStats,
long clusterVersion,
boolean archived) {
assert archived != (clusterVersion == UNKNOWN_CLUSTER_VERSION);
this.repositoryInfo = repositoryInfo;
this.repositoryStats = repositoryStats;
this.clusterVersion = clusterVersion;
this.archived = archived;
}
public RepositoryStatsSnapshot(StreamInput in) throws IOException {
this.repositoryInfo = new RepositoryInfo(in);
this.repositoryStats = new RepositoryStats(in);
this.clusterVersion = in.readLong();
this.archived = in.readBoolean();
}
public RepositoryInfo getRepositoryInfo() {
return repositoryInfo;
}
public RepositoryStats getRepositoryStats() {
return repositoryStats;
}
public boolean isArchived() {
return archived;
}
public long getClusterVersion() {
return clusterVersion;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
repositoryInfo.writeTo(out);
repositoryStats.writeTo(out);
out.writeLong(clusterVersion);
out.writeBoolean(archived);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
repositoryInfo.toXContent(builder, params);
builder.field("request_counts", repositoryStats.requestCounts);
builder.field("archived", archived);
if (archived) {
builder.field("cluster_version", clusterVersion);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RepositoryStatsSnapshot that = (RepositoryStatsSnapshot) o;
return repositoryInfo.equals(that.repositoryInfo) &&
repositoryStats.equals(that.repositoryStats) &&
clusterVersion == that.clusterVersion &&
archived == that.archived;
}
@Override
public int hashCode() {
return Objects.hash(repositoryInfo, repositoryStats, clusterVersion, archived);
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.blobstore;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.RepositoryInfo;
import org.elasticsearch.repositories.RepositoryStatsSnapshot;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
public abstract class MeteredBlobStoreRepository extends BlobStoreRepository {
private final RepositoryInfo repositoryInfo;
public MeteredBlobStoreRepository(RepositoryMetadata metadata,
boolean compress,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
RecoverySettings recoverySettings,
Map<String, String> location) {
super(metadata, compress, namedXContentRegistry, clusterService, recoverySettings);
ThreadPool threadPool = clusterService.getClusterApplierService().threadPool();
this.repositoryInfo = new RepositoryInfo(UUIDs.randomBase64UUID(),
metadata.name(),
metadata.type(),
location,
threadPool.absoluteTimeInMillis());
}
public RepositoryStatsSnapshot statsSnapshot() {
return new RepositoryStatsSnapshot(repositoryInfo, stats(), RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION, false);
}
public RepositoryStatsSnapshot statsSnapshotForArchival(long clusterVersion) {
RepositoryInfo stoppedRepoInfo = repositoryInfo.stopped(threadPool.absoluteTimeInMillis());
return new RepositoryStatsSnapshot(stoppedRepoInfo, stats(), clusterVersion, true);
}
}

View File

@ -23,23 +23,32 @@ import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ESTestCase;
@ -54,7 +63,9 @@ import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RepositoriesServiceTests extends ESTestCase {
@ -68,8 +79,16 @@ public class RepositoriesServiceTests extends ESTestCase {
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null,
Collections.emptySet());
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterApplierService.threadPool()).thenReturn(threadPool);
final ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
Map<String, Repository.Factory> typesRegistry =
org.elasticsearch.common.collect.Map.of(TestRepository.TYPE, TestRepository::new,
MeteredRepositoryTypeA.TYPE, metadata -> new MeteredRepositoryTypeA(metadata, clusterService),
MeteredRepositoryTypeB.TYPE, metadata -> new MeteredRepositoryTypeB(metadata, clusterService));
repositoriesService = new RepositoriesService(Settings.EMPTY, mock(ClusterService.class),
transportService, Collections.emptyMap(), Collections.singletonMap(TestRepository.TYPE, TestRepository::new), threadPool);
transportService, typesRegistry, typesRegistry, threadPool);
repositoriesService.start();
}
@ -115,6 +134,46 @@ public class RepositoriesServiceTests extends ESTestCase {
}
}
public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
ClusterState clusterStateWithRepoTypeA = createClusterStateWithRepo(repoName, MeteredRepositoryTypeA.TYPE);
repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeA, emptyState()));
assertThat(repositoriesService.repositoriesStats().size(), equalTo(1));
repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", emptyState(), clusterStateWithRepoTypeA));
assertThat(repositoriesService.repositoriesStats().size(), equalTo(1));
ClusterState clusterStateWithRepoTypeB = createClusterStateWithRepo(repoName, MeteredRepositoryTypeB.TYPE);
repositoriesService.applyClusterState(new ClusterChangedEvent("new repo", clusterStateWithRepoTypeB, emptyState()));
List<RepositoryStatsSnapshot> repositoriesStats = repositoriesService.repositoriesStats();
assertThat(repositoriesStats.size(), equalTo(2));
RepositoryStatsSnapshot repositoryStatsTypeA = repositoriesStats.get(0);
assertThat(repositoryStatsTypeA.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeA.TYPE));
assertThat(repositoryStatsTypeA.getRepositoryStats(), equalTo(MeteredRepositoryTypeA.STATS));
RepositoryStatsSnapshot repositoryStatsTypeB = repositoriesStats.get(1);
assertThat(repositoryStatsTypeB.getRepositoryInfo().type, equalTo(MeteredRepositoryTypeB.TYPE));
assertThat(repositoryStatsTypeB.getRepositoryStats(), equalTo(MeteredRepositoryTypeB.STATS));
}
private ClusterState createClusterStateWithRepo(String repoName, String repoType) {
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
Metadata.Builder mdBuilder = Metadata.builder();
mdBuilder.putCustom(RepositoriesMetadata.TYPE,
new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))));
state.metadata(mdBuilder);
return state.build();
}
private ClusterState emptyState() {
return ClusterState.builder(new ClusterName("test")).build();
}
private void assertThrowsOnRegister(String repoName) {
PutRepositoryRequest request = new PutRepositoryRequest(repoName);
expectThrows(RepositoryException.class, () -> repositoriesService.registerRepository(request, null));
@ -263,4 +322,62 @@ public class RepositoriesServiceTests extends ESTestCase {
isClosed = true;
}
}
private static class MeteredRepositoryTypeA extends MeteredBlobStoreRepository {
private static final String TYPE = "type-a";
private static final RepositoryStats STATS = new RepositoryStats(org.elasticsearch.common.collect.Map.of("GET", 10L));
private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) {
super(metadata,
false,
mock(NamedXContentRegistry.class),
clusterService,
mock(RecoverySettings.class),
org.elasticsearch.common.collect.Map.of("bucket", "bucket-a"));
}
@Override
protected BlobStore createBlobStore() {
return mock(BlobStore.class);
}
@Override
public RepositoryStats stats() {
return STATS;
}
@Override
public BlobPath basePath() {
return BlobPath.cleanPath();
}
}
private static class MeteredRepositoryTypeB extends MeteredBlobStoreRepository {
private static final String TYPE = "type-b";
private static final RepositoryStats STATS = new RepositoryStats(org.elasticsearch.common.collect.Map.of("LIST", 20L));
private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) {
super(metadata,
false,
mock(NamedXContentRegistry.class),
clusterService,
mock(RecoverySettings.class),
org.elasticsearch.common.collect.Map.of("bucket", "bucket-b"));
}
@Override
protected BlobStore createBlobStore() {
return mock(BlobStore.class);
}
@Override
public RepositoryStats stats() {
return STATS;
}
@Override
public BlobPath basePath() {
return BlobPath.cleanPath();
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
public class RepositoriesStatsArchiveTests extends ESTestCase {
public void testStatsAreEvictedOnceTheyAreOlderThanRetentionPeriod() {
int retentionTimeInMillis = randomIntBetween(100, 1000);
AtomicLong fakeRelativeClock = new AtomicLong();
RepositoriesStatsArchive repositoriesStatsArchive =
new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis),
100,
fakeRelativeClock::get);
for (int i = 0; i < randomInt(10); i++) {
RepositoryStatsSnapshot repoStats = createRepositoryStats(RepositoryStats.EMPTY_STATS);
repositoriesStatsArchive.archive(repoStats);
}
fakeRelativeClock.set(retentionTimeInMillis * 2);
int statsToBeRetainedCount = randomInt(10);
for (int i = 0; i < statsToBeRetainedCount; i++) {
RepositoryStatsSnapshot repoStats =
createRepositoryStats(new RepositoryStats(org.elasticsearch.common.collect.Map.of("GET", 10L)));
repositoriesStatsArchive.archive(repoStats);
}
List<RepositoryStatsSnapshot> archivedStats = repositoriesStatsArchive.getArchivedStats();
assertThat(archivedStats.size(), equalTo(statsToBeRetainedCount));
for (RepositoryStatsSnapshot repositoryStatsSnapshot : archivedStats) {
assertThat(repositoryStatsSnapshot.getRepositoryStats().requestCounts,
equalTo(org.elasticsearch.common.collect.Map.of("GET", 10L)));
}
}
public void testStatsAreRejectedIfTheArchiveIsFull() {
int retentionTimeInMillis = randomIntBetween(100, 1000);
AtomicLong fakeRelativeClock = new AtomicLong();
RepositoriesStatsArchive repositoriesStatsArchive =
new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis),
1,
fakeRelativeClock::get);
assertTrue(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS)));
fakeRelativeClock.set(retentionTimeInMillis * 2);
// Now there's room since the previous stats should be evicted
assertTrue(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS)));
// There's no room for stats with the same creation time
assertFalse(repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS)));
}
public void testClearArchive() {
int retentionTimeInMillis = randomIntBetween(100, 1000);
AtomicLong fakeRelativeClock = new AtomicLong();
RepositoriesStatsArchive repositoriesStatsArchive =
new RepositoriesStatsArchive(TimeValue.timeValueMillis(retentionTimeInMillis),
100,
fakeRelativeClock::get);
int archivedStatsWithVersionZero = randomIntBetween(1, 20);
for (int i = 0; i < archivedStatsWithVersionZero; i++) {
repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS, 0));
}
int archivedStatsWithNewerVersion = randomIntBetween(1, 20);
for (int i = 0; i < archivedStatsWithNewerVersion; i++) {
repositoriesStatsArchive.archive(createRepositoryStats(RepositoryStats.EMPTY_STATS, 1));
}
List<RepositoryStatsSnapshot> removedStats = repositoriesStatsArchive.clear(0L);
assertThat(removedStats.size(), equalTo(archivedStatsWithVersionZero));
assertThat(repositoriesStatsArchive.getArchivedStats().size(), equalTo(archivedStatsWithNewerVersion));
}
private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repositoryStats) {
return createRepositoryStats(repositoryStats, 0);
}
private RepositoryStatsSnapshot createRepositoryStats(RepositoryStats repositoryStats, long clusterVersion) {
RepositoryInfo repositoryInfo = new RepositoryInfo(UUIDs.randomBase64UUID(),
randomAlphaOfLength(10),
randomAlphaOfLength(10),
org.elasticsearch.common.collect.Map.of("bucket", randomAlphaOfLength(10)),
System.currentTimeMillis(),
null);
return new RepositoryStatsSnapshot(repositoryInfo, repositoryStats, clusterVersion, true);
}
}

View File

@ -17,3 +17,12 @@ services:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "8091"
azure-fixture-repositories-metering:
build:
context: .
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "8091"

View File

@ -36,3 +36,15 @@ services:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
gcs-fixture-repositories-metering:
build:
context: .
args:
port: 80
bucket: "bucket"
token: "o/oauth2/token"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"

View File

@ -30,6 +30,21 @@ services:
ports:
- "80"
s3-fixture-repositories-metering:
build:
context: .
args:
fixtureClass: fixture.s3.S3HttpFixture
port: 80
bucket: "bucket"
basePath: "base_path"
accessKey: "access_key"
dockerfile: Dockerfile
volumes:
- ./testfixtures_shared/shared:/fixture/shared
ports:
- "80"
s3-fixture-with-session-token:
build:
context: .

View File

@ -49,6 +49,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -135,8 +136,6 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate);
protected abstract List<String> requestTypesTracked();
/**
* Test the snapshot and restore of an index which has large segments files.
*/
@ -217,32 +216,25 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
Map<String, Long> sdkRequestCounts = repositoryStats.requestCounts;
for (String requestType : requestTypesTracked()) {
assertSDKCallsMatchMockCalls(sdkRequestCounts, requestType);
}
final Map<String, Long> mockCalls = getMockRequestCounts();
String assertionErrorMsg = String.format("SDK sent [%s] calls and handler measured [%s] calls",
sdkRequestCounts,
mockCalls);
assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts);
}
private void assertSDKCallsMatchMockCalls(Map<String, Long> sdkRequestCount, String requestTye) {
final long sdkCalls = sdkRequestCount.getOrDefault(requestTye, 0L);
final long mockCalls = handlers.values().stream()
.mapToLong(h -> {
while (h instanceof DelegatingHttpHandler) {
if (h instanceof HttpStatsCollectorHandler) {
return ((HttpStatsCollectorHandler) h).getCount(requestTye);
}
h = ((DelegatingHttpHandler) h).getDelegate();
private Map<String, Long> getMockRequestCounts() {
for (HttpHandler h : handlers.values()) {
while (h instanceof DelegatingHttpHandler) {
if (h instanceof HttpStatsCollectorHandler) {
return ((HttpStatsCollectorHandler) h).getOperationsCount();
}
return 0L;
}).sum();
String assertionErrorMsg = String.format("SDK sent %d [%s] calls and handler measured %d [%s] calls",
sdkCalls,
requestTye,
mockCalls,
requestTye);
assertEquals(assertionErrorMsg, mockCalls, sdkCalls);
h = ((DelegatingHttpHandler) h).getDelegate();
}
}
return Collections.emptyMap();
}
protected static String httpServerUrl() {
@ -352,8 +344,8 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
return delegate;
}
synchronized long getCount(final String requestType) {
return operationCount.getOrDefault(requestType, 0L);
synchronized Map<String, Long> getOperationsCount() {
return org.elasticsearch.common.collect.Map.copyOf(operationCount);
}
protected synchronized void trackRequest(final String requestType) {

View File

@ -23,6 +23,8 @@ import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContexts;
@ -31,6 +33,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RequestOptions.Builder;
@ -1220,6 +1223,10 @@ public abstract class ESRestTestCase extends ESTestCase {
protected static Map<String, Object> getAsMap(final String endpoint) throws IOException {
Response response = client().performRequest(new Request("GET", endpoint));
return responseAsMap(response);
}
protected static Map<String, Object> responseAsMap(Response response) throws IOException {
XContentType entityContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue());
Map<String, Object> responseEntity = XContentHelper.convertToMap(entityContentType.xContent(),
response.getEntity().getContent(), false);
@ -1227,6 +1234,52 @@ public abstract class ESRestTestCase extends ESTestCase {
return responseEntity;
}
protected static void registerRepository(String repository, String type, boolean verify, Settings settings) throws IOException {
final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository);
request.addParameter("verify", Boolean.toString(verify));
request.setJsonEntity(Strings.toString(new PutRepositoryRequest(repository).type(type).settings(settings)));
final Response response = client().performRequest(request);
assertAcked("Failed to create repository [" + repository + "] of type [" + type + "]: " + response, response);
}
protected static void createSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException {
final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot);
request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion));
final Response response = client().performRequest(request);
assertThat(
"Failed to create snapshot [" + snapshot + "] in repository [" + repository + "]: " + response,
response.getStatusLine().getStatusCode(),
equalTo(RestStatus.OK.getStatus())
);
}
protected static void restoreSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException {
final Request request = new Request(HttpPost.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot + "/_restore");
request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion));
final Response response = client().performRequest(request);
assertThat(
"Failed to restore snapshot [" + snapshot + "] from repository [" + repository + "]: " + response,
response.getStatusLine().getStatusCode(),
equalTo(RestStatus.OK.getStatus())
);
}
@SuppressWarnings("unchecked")
private static void assertAcked(String message, Response response) throws IOException {
final int responseStatusCode = response.getStatusLine().getStatusCode();
assertThat(
message + ": expecting response code [200] but got [" + responseStatusCode + ']',
responseStatusCode,
equalTo(RestStatus.OK.getStatus())
);
final Map<String, Object> responseAsMap = responseAsMap(response);
Boolean acknowledged = (Boolean) XContentMapValues.extractValue(responseAsMap, "acknowledged");
assertThat(message + ": response is not acknowledged", acknowledged, equalTo(Boolean.TRUE));
}
/**
* Is this template one that is automatically created by xpack?
*/

View File

@ -0,0 +1,46 @@
evaluationDependsOn(xpackModule('core'))
apply plugin: 'elasticsearch.esplugin'
esplugin {
name 'repositories-metering-api'
description 'Repositories metering API'
classname 'org.elasticsearch.xpack.repositories.metering.RepositoriesMeteringPlugin'
extendedPlugins = ['x-pack-core']
}
archivesBaseName = 'x-pack-repositories-metering-api'
dependencies {
compileOnly project(path: xpackModule('core'), configuration: 'default')
testImplementation project(path: xpackModule('core'), configuration: 'testArtifacts')
}
// xpack modules are installed in real clusters as the meta plugin, so
// installing them as individual plugins for integ tests doesn't make sense,
// so we disable integ tests
integTest.enabled = false
// add all sub-projects of the qa sub-project
gradle.projectsEvaluated {
project.subprojects
.find { it.path == project.path + ":qa" }
.subprojects
.findAll { it.path.startsWith(project.path + ":qa") }
.each { check.dependsOn it.check }
}
configurations {
testArtifacts.extendsFrom testRuntime
testArtifacts.extendsFrom testImplementation
}
task testJar(type: Jar) {
appendix 'test'
from sourceSets.test.output
}
artifacts {
testArtifacts testJar
}
test {
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.info.BuildParams
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'elasticsearch.rest-resources'
final Project fixture = project(':test:fixtures:azure-fixture')
final Project repositoryPlugin = project(':plugins:repository-azure')
dependencies {
testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts')
testImplementation repositoryPlugin
}
restResources {
restApi {
includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common'
includeXpack 'repositories-metering-api'
}
}
boolean useFixture = false
String azureAccount = System.getenv("azure_storage_account")
String azureKey = System.getenv("azure_storage_key")
String azureContainer = System.getenv("azure_storage_container")
String azureBasePath = System.getenv("azure_storage_base_path")
String azureSasToken = System.getenv("azure_storage_sas_token")
if (!azureAccount && !azureKey && !azureContainer && !azureBasePath && !azureSasToken) {
azureAccount = 'azure_integration_test_account'
azureKey = 'YXp1cmVfaW50ZWdyYXRpb25fdGVzdF9rZXk=' // The key is "azure_integration_test_key" encoded using base64
azureContainer = 'container'
azureBasePath = ''
azureSasToken = ''
useFixture = true
}
if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(fixture.path, 'azure-fixture-repositories-metering')
}
integTest {
dependsOn repositoryPlugin.bundlePlugin
systemProperty 'test.azure.container', azureContainer
nonInputProperties.systemProperty 'test.azure.base_path', azureBasePath + "_repositories_metering_tests_" + BuildParams.testSeed
}
testClusters.integTest {
testDistribution = 'DEFAULT'
plugin repositoryPlugin.bundlePlugin.archiveFile
keystore 'azure.client.repositories_metering.account', azureAccount
if (azureKey != null && azureKey.isEmpty() == false) {
keystore 'azure.client.repositories_metering.key', azureKey
}
if (azureSasToken != null && azureSasToken.isEmpty() == false) {
keystore 'azure.client.repositories_metering.sas_token', azureSasToken
}
if (useFixture) {
def fixtureAddress = { fixtureName ->
assert useFixture: 'closure should not be used without a fixture'
int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.${fixtureName}.tcp.8091"
assert ephemeralPort > 0
'127.0.0.1:' + ephemeralPort
}
setting 'azure.client.repositories_metering.endpoint_suffix',
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=http://${-> fixtureAddress('azure-fixture-repositories-metering')}" }, IGNORE_VALUE
} else {
println "Using an external service to test " + project.name
}
}
task azureThirdPartyTest {
dependsOn integTest
}

View File

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.azure;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase;
import java.util.List;
import java.util.Map;
public class AzureRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase {
@Override
protected String repositoryType() {
return "azure";
}
@Override
protected Map<String, String> repositoryLocation() {
return org.elasticsearch.common.collect.Map.of(
"container",
getProperty("test.azure.container"),
"base_path",
getProperty("test.azure.base_path")
);
}
@Override
protected Settings repositorySettings() {
final String container = getProperty("test.azure.container");
final String basePath = getProperty("test.azure.base_path");
return Settings.builder().put("client", "repositories_metering").put("container", container).put("base_path", basePath).build();
}
@Override
protected Settings updatedRepositorySettings() {
return Settings.builder().put(repositorySettings()).put("azure.client.repositories_metering.max_retries", 5).build();
}
@Override
protected List<String> readCounterKeys() {
return org.elasticsearch.common.collect.List.of("GetBlob", "GetBlobProperties", "ListBlobs");
}
@Override
protected List<String> writeCounterKeys() {
return org.elasticsearch.common.collect.List.of("PutBlob");
}
}

View File

@ -0,0 +1,6 @@
apply plugin: 'elasticsearch.build'
test.enabled = false
dependencies {
api project(':test:framework')
}

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.MavenFilteringHack
import java.nio.file.Files
import java.security.KeyPair
import java.security.KeyPairGenerator
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'elasticsearch.rest-resources'
final Project fixture = project(':test:fixtures:gcs-fixture')
final Project repositoryPlugin = project(':plugins:repository-gcs')
dependencies {
testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts')
testImplementation repositoryPlugin
}
restResources {
restApi {
includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common'
includeXpack 'repositories-metering-api'
}
}
boolean useFixture = false
String gcsServiceAccount = System.getenv("google_storage_service_account")
String gcsBucket = System.getenv("google_storage_bucket")
String gcsBasePath = System.getenv("google_storage_base_path")
File serviceAccountFile = null
if (!gcsServiceAccount && !gcsBucket && !gcsBasePath) {
serviceAccountFile = new File(project.buildDir, 'generated-resources/service_account_test.json')
gcsBucket = 'bucket'
gcsBasePath = 'integration_test'
useFixture = true
} else if (!gcsServiceAccount || !gcsBucket || !gcsBasePath) {
throw new IllegalArgumentException("not all options specified to run tests against external GCS service are present")
} else {
serviceAccountFile = new File(gcsServiceAccount)
}
/** A service account file that points to the Google Cloud Storage service emulated by the fixture **/
task createServiceAccountFile() {
doLast {
KeyPairGenerator keyPairGenerator = KeyPairGenerator.getInstance("RSA")
keyPairGenerator.initialize(1024)
KeyPair keyPair = keyPairGenerator.generateKeyPair()
String encodedKey = Base64.getEncoder().encodeToString(keyPair.private.getEncoded())
serviceAccountFile.parentFile.mkdirs()
serviceAccountFile.setText("{\n" +
' "type": "service_account",\n' +
' "project_id": "integration_test",\n' +
' "private_key_id": "' + UUID.randomUUID().toString() + '",\n' +
' "private_key": "-----BEGIN PRIVATE KEY-----\\n' + encodedKey + '\\n-----END PRIVATE KEY-----\\n",\n' +
' "client_email": "integration_test@appspot.gserviceaccount.com",\n' +
' "client_id": "123456789101112130594"\n' +
'}', 'UTF-8')
}
}
def fixtureAddress = { f ->
assert useFixture: 'closure should not be used without a fixture'
int ephemeralPort = project(':test:fixtures:gcs-fixture').postProcessFixture.ext."test.fixtures.${f}.tcp.80"
assert ephemeralPort > 0
'http://127.0.0.1:' + ephemeralPort
}
Map<String, Object> expansions = [
'bucket' : gcsBucket,
'base_path': gcsBasePath + "_integration_tests"
]
processTestResources {
inputs.properties(expansions)
MavenFilteringHack.filter(it, expansions)
}
if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(fixture.path, 'gcs-fixture-repositories-metering')
}
integTest {
dependsOn repositoryPlugin.bundlePlugin
systemProperty 'test.gcs.bucket', gcsBucket
nonInputProperties.systemProperty 'test.gcs.base_path', gcsBasePath + "_repositories_metering" + BuildParams.testSeed
}
testClusters.integTest {
testDistribution = 'DEFAULT'
plugin repositoryPlugin.bundlePlugin.archiveFile
keystore 'gcs.client.repositories_metering.credentials_file', serviceAccountFile, IGNORE_VALUE
if (useFixture) {
tasks.integTest.dependsOn createServiceAccountFile
/* Use a closure on the string to delay evaluation until tests are executed */
setting 'gcs.client.repositories_metering.endpoint', { "${-> fixtureAddress('gcs-fixture-repositories-metering')}" }, IGNORE_VALUE
setting 'gcs.client.repositories_metering.token_uri', { "${-> fixtureAddress('gcs-fixture-repositories-metering')}/o/oauth2/token" }, IGNORE_VALUE
} else {
println "Using an external service to test " + project.name
}
}
task gcsThirdPartyTest {
dependsOn integTest
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.gcs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase;
import java.util.List;
import java.util.Map;
public class GCSRepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase {
@Override
protected String repositoryType() {
return "gcs";
}
@Override
protected Map<String, String> repositoryLocation() {
return org.elasticsearch.common.collect.Map.of(
"bucket",
getProperty("test.gcs.bucket"),
"base_path",
getProperty("test.gcs.base_path")
);
}
@Override
protected Settings repositorySettings() {
final String bucket = getProperty("test.gcs.bucket");
final String basePath = getProperty("test.gcs.base_path");
return Settings.builder().put("client", "repositories_metering").put("bucket", bucket).put("base_path", basePath).build();
}
@Override
protected Settings updatedRepositorySettings() {
return Settings.builder().put(repositorySettings()).put("gcs.client.repositories_metering.application_name", "updated").build();
}
@Override
protected List<String> readCounterKeys() {
return org.elasticsearch.common.collect.List.of("GetObject", "ListObjects");
}
@Override
protected List<String> writeCounterKeys() {
return org.elasticsearch.common.collect.List.of("InsertObject");
}
}

View File

@ -0,0 +1,75 @@
import static org.elasticsearch.gradle.PropertyNormalization.IGNORE_VALUE
import org.elasticsearch.gradle.info.BuildParams
apply plugin: 'elasticsearch.standalone-rest-test'
apply plugin: 'elasticsearch.rest-test'
apply plugin: 'elasticsearch.rest-resources'
final Project fixture = project(':test:fixtures:s3-fixture')
final Project repositoryPlugin = project(':plugins:repository-s3')
dependencies {
testImplementation project(path: xpackModule('repositories-metering-api'), configuration: 'testArtifacts')
testImplementation repositoryPlugin
}
restResources {
restApi {
includeCore 'indices', 'bulk', 'snapshot', 'nodes', '_common'
includeXpack 'repositories-metering-api'
}
}
boolean useFixture = false
String s3AccessKey = System.getenv("amazon_s3_access_key")
String s3SecretKey = System.getenv("amazon_s3_secret_key")
String s3Bucket = System.getenv("amazon_s3_bucket")
String s3BasePath = System.getenv("amazon_s3_base_path")
if (!s3AccessKey && !s3SecretKey && !s3Bucket && !s3BasePath) {
s3AccessKey = 'access_key'
s3SecretKey = 'secret_key'
s3Bucket = 'bucket'
s3BasePath = null
useFixture = true
} else if (!s3AccessKey || !s3SecretKey || !s3Bucket || !s3BasePath) {
throw new IllegalArgumentException("not all options specified to run against external S3 service are present")
}
if (useFixture) {
apply plugin: 'elasticsearch.test.fixtures'
testFixtures.useFixture(fixture.path, 's3-fixture-repositories-metering')
}
integTest {
dependsOn repositoryPlugin.bundlePlugin
systemProperty 'test.s3.bucket', s3Bucket
nonInputProperties.systemProperty 'test.s3.base_path', s3BasePath ? s3BasePath + "_repositories_metering" + BuildParams.testSeed : 'base_path'
}
testClusters.integTest {
testDistribution = 'DEFAULT'
plugin repositoryPlugin.bundlePlugin.archiveFile
keystore 's3.client.repositories_metering.access_key', s3AccessKey
keystore 's3.client.repositories_metering.secret_key', s3SecretKey
if (useFixture) {
def fixtureAddress = { fixtureName ->
assert useFixture: 'closure should not be used without a fixture'
int ephemeralPort = fixture.postProcessFixture.ext."test.fixtures.${fixtureName}.tcp.80"
assert ephemeralPort > 0
'127.0.0.1:' + ephemeralPort
}
setting 's3.client.repositories_metering.protocol', 'http'
setting 's3.client.repositories_metering.endpoint', { "${-> fixtureAddress('s3-fixture-repositories-metering')}" }, IGNORE_VALUE
} else {
println "Using an external service to test " + project.name
}
}
task s3ThirdPartyTest {
dependsOn integTest
}

View File

@ -0,0 +1,54 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.s3;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.repositories.metering.AbstractRepositoriesMeteringAPIRestTestCase;
import java.util.List;
import java.util.Map;
public class S3RepositoriesMeteringIT extends AbstractRepositoriesMeteringAPIRestTestCase {
@Override
protected String repositoryType() {
return "s3";
}
@Override
protected Map<String, String> repositoryLocation() {
return org.elasticsearch.common.collect.Map.of(
"bucket",
getProperty("test.s3.bucket"),
"base_path",
getProperty("test.s3.base_path")
);
}
@Override
protected Settings repositorySettings() {
final String bucket = getProperty("test.s3.bucket");
final String basePath = getProperty("test.s3.base_path");
return Settings.builder().put("client", "repositories_metering").put("bucket", bucket).put("base_path", basePath).build();
}
@Override
protected Settings updatedRepositorySettings() {
Settings settings = repositorySettings();
return Settings.builder().put(settings).put("s3.client.max_retries", 4).build();
}
@Override
protected List<String> readCounterKeys() {
return org.elasticsearch.common.collect.List.of("GetObject", "ListObjects");
}
@Override
protected List<String> writeCounterKeys() {
return org.elasticsearch.common.collect.List.of("PutObject");
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveAction;
import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringAction;
import org.elasticsearch.xpack.repositories.metering.action.TransportClearRepositoriesStatsArchiveAction;
import org.elasticsearch.xpack.repositories.metering.action.TransportRepositoriesStatsAction;
import org.elasticsearch.xpack.repositories.metering.rest.RestClearRepositoriesMeteringArchiveAction;
import org.elasticsearch.xpack.repositories.metering.rest.RestGetRepositoriesMeteringAction;
import java.util.List;
import java.util.function.Supplier;
public final class RepositoriesMeteringPlugin extends Plugin implements ActionPlugin {
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return org.elasticsearch.common.collect.List.of(
new ActionHandler<>(RepositoriesMeteringAction.INSTANCE, TransportRepositoriesStatsAction.class),
new ActionHandler<>(ClearRepositoriesMeteringArchiveAction.INSTANCE, TransportClearRepositoriesStatsArchiveAction.class)
);
}
@Override
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return org.elasticsearch.common.collect.List.of(
new RestGetRepositoriesMeteringAction(),
new RestClearRepositoriesMeteringArchiveAction()
);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.ActionType;
public final class ClearRepositoriesMeteringArchiveAction extends ActionType<RepositoriesMeteringResponse> {
public static final ClearRepositoriesMeteringArchiveAction INSTANCE = new ClearRepositoriesMeteringArchiveAction();
static final String NAME = "cluster:monitor/xpack/repositories_metering/clear_metering_archive";
ClearRepositoriesMeteringArchiveAction() {
super(NAME, RepositoriesMeteringResponse::new);
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public final class ClearRepositoriesMeteringArchiveRequest extends BaseNodesRequest<ClearRepositoriesMeteringArchiveRequest> {
private final long maxVersionToClear;
public ClearRepositoriesMeteringArchiveRequest(StreamInput in) throws IOException {
super(in);
this.maxVersionToClear = in.readLong();
}
public ClearRepositoriesMeteringArchiveRequest(long maxVersionToClear, String... nodesIds) {
super(nodesIds);
this.maxVersionToClear = maxVersionToClear;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(maxVersionToClear);
}
public long getMaxVersionToClear() {
return maxVersionToClear;
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.ActionType;
public final class RepositoriesMeteringAction extends ActionType<RepositoriesMeteringResponse> {
public static final RepositoriesMeteringAction INSTANCE = new RepositoriesMeteringAction();
static final String NAME = "cluster:monitor/xpack/repositories_metering/get_metrics";
RepositoriesMeteringAction() {
super(NAME, RepositoriesMeteringResponse::new);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public final class RepositoriesMeteringRequest extends BaseNodesRequest<RepositoriesMeteringRequest> {
public RepositoriesMeteringRequest(StreamInput in) throws IOException {
super(in);
}
public RepositoriesMeteringRequest(String... nodesIds) {
super(nodesIds);
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
public final class RepositoriesMeteringResponse extends BaseNodesResponse<RepositoriesNodeMeteringResponse> implements ToXContentFragment {
public RepositoriesMeteringResponse(StreamInput in) throws IOException {
super(in);
}
public RepositoriesMeteringResponse(
ClusterName clusterName,
List<RepositoriesNodeMeteringResponse> nodes,
List<FailedNodeException> failures
) {
super(clusterName, nodes, failures);
}
@Override
protected List<RepositoriesNodeMeteringResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readList(RepositoriesNodeMeteringResponse::new);
}
@Override
protected void writeNodesTo(StreamOutput out, List<RepositoriesNodeMeteringResponse> nodes) throws IOException {
out.writeList(nodes);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes");
for (RepositoriesNodeMeteringResponse nodeStats : getNodes()) {
nodeStats.toXContent(builder, params);
}
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.RepositoryStatsSnapshot;
import java.io.IOException;
import java.util.List;
public final class RepositoriesNodeMeteringResponse extends BaseNodeResponse implements ToXContentFragment {
final List<RepositoryStatsSnapshot> repositoryStatsSnapshots;
public RepositoriesNodeMeteringResponse(DiscoveryNode node, List<RepositoryStatsSnapshot> repositoryStatsSnapshots) {
super(node);
this.repositoryStatsSnapshots = repositoryStatsSnapshots;
}
public RepositoriesNodeMeteringResponse(StreamInput in) throws IOException {
super(in);
this.repositoryStatsSnapshots = in.readList(RepositoryStatsSnapshot::new);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray(getNode().getId());
for (RepositoryStatsSnapshot repositoryStats : repositoryStatsSnapshots) {
repositoryStats.toXContent(builder, params);
}
builder.endArray();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeList(repositoryStatsSnapshots);
}
}

View File

@ -0,0 +1,98 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryStatsSnapshot;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
public final class TransportClearRepositoriesStatsArchiveAction extends TransportNodesAction<
ClearRepositoriesMeteringArchiveRequest,
RepositoriesMeteringResponse,
TransportClearRepositoriesStatsArchiveAction.ClearRepositoriesStatsArchiveNodeRequest,
RepositoriesNodeMeteringResponse> {
private final RepositoriesService repositoriesService;
@Inject
public TransportClearRepositoriesStatsArchiveAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
RepositoriesService repositoriesService
) {
super(
ClearRepositoriesMeteringArchiveAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
ClearRepositoriesMeteringArchiveRequest::new,
ClearRepositoriesStatsArchiveNodeRequest::new,
ThreadPool.Names.SAME,
RepositoriesNodeMeteringResponse.class
);
this.repositoriesService = repositoriesService;
}
@Override
protected RepositoriesMeteringResponse newResponse(
ClearRepositoriesMeteringArchiveRequest request,
List<RepositoriesNodeMeteringResponse> nodesResponses,
List<FailedNodeException> failures
) {
return new RepositoriesMeteringResponse(clusterService.getClusterName(), nodesResponses, failures);
}
@Override
protected ClearRepositoriesStatsArchiveNodeRequest newNodeRequest(ClearRepositoriesMeteringArchiveRequest request) {
return new ClearRepositoriesStatsArchiveNodeRequest(request.getMaxVersionToClear());
}
@Override
protected RepositoriesNodeMeteringResponse newNodeResponse(StreamInput in) throws IOException {
return new RepositoriesNodeMeteringResponse(in);
}
@Override
protected RepositoriesNodeMeteringResponse nodeOperation(ClearRepositoriesStatsArchiveNodeRequest request) {
List<RepositoryStatsSnapshot> clearedStats = repositoriesService.clearRepositoriesStatsArchive(request.maxVersionToClear);
return new RepositoriesNodeMeteringResponse(clusterService.localNode(), clearedStats);
}
static final class ClearRepositoriesStatsArchiveNodeRequest extends BaseNodeRequest {
private final long maxVersionToClear;
ClearRepositoriesStatsArchiveNodeRequest(long maxVersionToClear) {
this.maxVersionToClear = maxVersionToClear;
}
ClearRepositoriesStatsArchiveNodeRequest(StreamInput in) throws IOException {
super(in);
this.maxVersionToClear = in.readLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(maxVersionToClear);
}
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
public final class TransportRepositoriesStatsAction extends TransportNodesAction<
RepositoriesMeteringRequest,
RepositoriesMeteringResponse,
TransportRepositoriesStatsAction.RepositoriesNodeStatsRequest,
RepositoriesNodeMeteringResponse> {
private final RepositoriesService repositoriesService;
@Inject
public TransportRepositoriesStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
RepositoriesService repositoriesService
) {
super(
RepositoriesMeteringAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
RepositoriesMeteringRequest::new,
RepositoriesNodeStatsRequest::new,
ThreadPool.Names.SAME,
RepositoriesNodeMeteringResponse.class
);
this.repositoriesService = repositoriesService;
}
@Override
protected RepositoriesMeteringResponse newResponse(
RepositoriesMeteringRequest request,
List<RepositoriesNodeMeteringResponse> repositoriesNodeStatsResponses,
List<FailedNodeException> failures
) {
return new RepositoriesMeteringResponse(clusterService.getClusterName(), repositoriesNodeStatsResponses, failures);
}
@Override
protected RepositoriesNodeStatsRequest newNodeRequest(RepositoriesMeteringRequest request) {
return new RepositoriesNodeStatsRequest();
}
@Override
protected RepositoriesNodeMeteringResponse newNodeResponse(StreamInput in) throws IOException {
return new RepositoriesNodeMeteringResponse(in);
}
@Override
protected RepositoriesNodeMeteringResponse nodeOperation(RepositoriesNodeStatsRequest request) {
return new RepositoriesNodeMeteringResponse(clusterService.localNode(), repositoriesService.repositoriesStats());
}
static final class RepositoriesNodeStatsRequest extends BaseNodeRequest {
RepositoriesNodeStatsRequest() {}
RepositoriesNodeStatsRequest(StreamInput in) throws IOException {
super(in);
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveAction;
import org.elasticsearch.xpack.repositories.metering.action.ClearRepositoriesMeteringArchiveRequest;
import java.util.List;
public class RestClearRepositoriesMeteringArchiveAction extends BaseRestHandler {
@Override
public String getName() {
return "clear_repositories_metrics_archive_action";
}
@Override
public List<Route> routes() {
return org.elasticsearch.common.collect.List.of(
new Route(RestRequest.Method.DELETE, "/_nodes/{nodeId}/_repositories_metering/{maxVersionToClear}")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
long maxVersionToClear = request.paramAsLong("maxVersionToClear", -1);
ClearRepositoriesMeteringArchiveRequest clearArchivesRequest = new ClearRepositoriesMeteringArchiveRequest(
maxVersionToClear,
nodesIds
);
return channel -> client.execute(
ClearRepositoriesMeteringArchiveAction.INSTANCE,
clearArchivesRequest,
new RestActions.NodesResponseRestListener<>(channel)
);
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.rest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringRequest;
import org.elasticsearch.xpack.repositories.metering.action.RepositoriesMeteringAction;
import java.util.List;
public final class RestGetRepositoriesMeteringAction extends BaseRestHandler {
@Override
public String getName() {
return "get_repositories_metering_action";
}
@Override
public List<Route> routes() {
return org.elasticsearch.common.collect.List.of(new Route(RestRequest.Method.GET, "/_nodes/{nodeId}/_repositories_metering"));
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String[] nodesIds = Strings.splitStringByCommaToArray(request.param("nodeId"));
RepositoriesMeteringRequest repositoriesMeteringRequest = new RepositoriesMeteringRequest(nodesIds);
return channel -> client.execute(
RepositoriesMeteringAction.INSTANCE,
repositoriesMeteringRequest,
new RestActions.NodesResponseRestListener<>(channel)
);
}
}

View File

@ -0,0 +1,374 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.repositories.RepositoryInfo;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.repositories.RepositoryStatsSnapshot;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
public abstract class AbstractRepositoriesMeteringAPIRestTestCase extends ESRestTestCase {
protected abstract String repositoryType();
protected abstract Map<String, String> repositoryLocation();
protected abstract Settings repositorySettings();
/**
* New settings to force a new repository creation
*/
protected abstract Settings updatedRepositorySettings();
protected abstract List<String> readCounterKeys();
protected abstract List<String> writeCounterKeys();
@Before
public void clearArchive() throws Exception {
clearRepositoriesStats(Long.MAX_VALUE);
}
public void testStatsAreTracked() throws Exception {
snapshotAndRestoreIndex((repository, index) -> {
List<RepositoryStatsSnapshot> repoStats = getRepositoriesStats();
assertThat(repoStats.size(), equalTo(1));
RepositoryStatsSnapshot repositoryStats = repoStats.get(0);
assertRepositoryStatsBelongToRepository(repositoryStats, repository);
assertRequestCountersAccountedForReads(repositoryStats);
assertRequestCountersAccountedForWrites(repositoryStats);
});
}
public void testStatsAreUpdatedAfterRepositoryOperations() throws Exception {
String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
snapshotAndRestoreIndex(snapshot, (repository, index) -> {
List<RepositoryStatsSnapshot> repoStatsBeforeRestore = getRepositoriesStats();
assertThat(repoStatsBeforeRestore.size(), equalTo(1));
RepositoryStatsSnapshot repositoryStatsBeforeRestore = repoStatsBeforeRestore.get(0);
Map<String, Long> requestCountsBeforeRestore = repositoryStatsBeforeRestore.getRepositoryStats().requestCounts;
assertRepositoryStatsBelongToRepository(repositoryStatsBeforeRestore, repository);
assertRequestCountersAccountedForReads(repositoryStatsBeforeRestore);
assertRequestCountersAccountedForWrites(repositoryStatsBeforeRestore);
deleteIndex(index);
restoreSnapshot(repository, snapshot, true);
List<RepositoryStatsSnapshot> updatedRepoStats = getRepositoriesStats();
assertThat(updatedRepoStats.size(), equalTo(1));
RepositoryStatsSnapshot repoStatsAfterRestore = updatedRepoStats.get(0);
Map<String, Long> requestCountsAfterRestore = repoStatsAfterRestore.getRepositoryStats().requestCounts;
for (String readCounterKey : readCounterKeys()) {
assertThat(
requestCountsAfterRestore.get(readCounterKey),
greaterThanOrEqualTo(requestCountsBeforeRestore.get(readCounterKey))
);
}
});
}
public void testClearRepositoriesStats() throws Exception {
snapshotAndRestoreIndex((repository, index) -> {
deleteRepository(repository);
List<RepositoryStatsSnapshot> repositoriesStatsBeforeClearing = getRepositoriesStats();
assertThat(repositoriesStatsBeforeClearing.size(), equalTo(1));
RepositoryStatsSnapshot repositoryStatsSnapshot = repositoriesStatsBeforeClearing.get(0);
assertThat(clearRepositoriesStats(-1).size(), equalTo(0));
List<RepositoryStatsSnapshot> removedRepositoriesStats = clearRepositoriesStats(repositoryStatsSnapshot.getClusterVersion());
assertThat(repositoriesStatsBeforeClearing, equalTo(removedRepositoriesStats));
assertThat(getRepositoriesStats().size(), equalTo(0));
});
}
public void testRegisterMultipleRepositoriesAndGetStats() throws Exception {
List<String> repositoryNames = org.elasticsearch.common.collect.List.of("repo-a", "repo-b", "repo-c");
for (String repositoryName : repositoryNames) {
registerRepository(repositoryName, repositoryType(), false, repositorySettings());
}
List<RepositoryStatsSnapshot> repositoriesStats = getRepositoriesStats();
Map<String, List<RepositoryStatsSnapshot>> repositoryStatsByName = repositoriesStats.stream()
.collect(Collectors.groupingBy(r -> r.getRepositoryInfo().name));
for (String repositoryName : repositoryNames) {
List<RepositoryStatsSnapshot> repositoryStats = repositoryStatsByName.get(repositoryName);
assertThat(repositoryStats, is(notNullValue()));
assertThat(repositoryStats.size(), equalTo(1));
RepositoryStatsSnapshot stats = repositoryStats.get(0);
assertRepositoryStatsBelongToRepository(stats, repositoryName);
assertAllRequestCountsAreZero(stats);
}
}
public void testStatsAreArchivedAfterRepositoryDeletion() throws Exception {
snapshotAndRestoreIndex((repository, index) -> {
List<RepositoryStatsSnapshot> repositoriesStats = getRepositoriesStats();
assertThat(repositoriesStats.size(), equalTo(1));
RepositoryStatsSnapshot statsBeforeRepoDeletion = repositoriesStats.get(0);
assertRepositoryStatsBelongToRepository(statsBeforeRepoDeletion, repository);
deleteRepository(repository);
List<RepositoryStatsSnapshot> repoStatsAfterDeletion = getRepositoriesStats();
assertThat(repoStatsAfterDeletion.size(), equalTo(1));
RepositoryStatsSnapshot statsAfterRepoDeletion = repoStatsAfterDeletion.get(0);
assertStatsAreEqualsIgnoringStoppedAt(statsBeforeRepoDeletion, statsAfterRepoDeletion);
});
}
public void testStatsAreStoredIntoANewCounterInstanceAfterRepoConfigUpdate() throws Exception {
final String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
snapshotAndRestoreIndex(snapshot, (repository, index) -> {
List<RepositoryStatsSnapshot> repositoriesStatsBeforeUpdate = getRepositoriesStats();
assertThat(repositoriesStatsBeforeUpdate.size(), equalTo(1));
assertRepositoryStatsBelongToRepository(repositoriesStatsBeforeUpdate.get(0), repository);
assertRequestCountersAccountedForReads(repositoriesStatsBeforeUpdate.get(0));
assertRequestCountersAccountedForWrites(repositoriesStatsBeforeUpdate.get(0));
// Update repository
registerRepository(repository, repositoryType(), false, updatedRepositorySettings());
List<RepositoryStatsSnapshot> repositoriesStatsAfterUpdate = getRepositoriesStats();
assertThat(repositoriesStatsAfterUpdate.size(), equalTo(2));
assertStatsAreEqualsIgnoringStoppedAt(repositoriesStatsBeforeUpdate.get(0), repositoriesStatsAfterUpdate.get(0));
// The counters for the new repository instance are zero
assertAllRequestCountsAreZero(repositoriesStatsAfterUpdate.get(1));
deleteIndex(index);
restoreSnapshot(repository, snapshot, true);
List<RepositoryStatsSnapshot> repoStatsAfterRestore = getRepositoriesStats();
assertThat(repoStatsAfterRestore.size(), equalTo(2));
assertStatsAreEqualsIgnoringStoppedAt(repositoriesStatsAfterUpdate.get(0), repoStatsAfterRestore.get(0));
assertRequestCountersAccountedForReads(repoStatsAfterRestore.get(1));
});
}
public void testDeleteThenAddRepositoryWithTheSameName() throws Exception {
snapshotAndRestoreIndex((repository, index) -> {
List<RepositoryStatsSnapshot> repoStatsBeforeDeletion = getRepositoriesStats();
assertThat(repoStatsBeforeDeletion.size(), equalTo(1));
deleteRepository(repository);
List<RepositoryStatsSnapshot> repoStatsAfterDeletion = getRepositoriesStats();
assertThat(repoStatsAfterDeletion.size(), equalTo(1));
assertStatsAreEqualsIgnoringStoppedAt(repoStatsBeforeDeletion.get(0), repoStatsAfterDeletion.get(0));
registerRepository(repository, repositoryType(), false, repositorySettings());
List<RepositoryStatsSnapshot> repositoriesStatsAfterRegisteringTheSameRepo = getRepositoriesStats();
assertThat(repositoriesStatsAfterRegisteringTheSameRepo.size(), equalTo(2));
assertStatsAreEqualsIgnoringStoppedAt(repoStatsBeforeDeletion.get(0), repositoriesStatsAfterRegisteringTheSameRepo.get(0));
assertAllRequestCountsAreZero(repositoriesStatsAfterRegisteringTheSameRepo.get(1));
});
}
private void snapshotAndRestoreIndex(CheckedBiConsumer<String, String, Exception> biConsumer) throws Exception {
final String snapshot = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
snapshotAndRestoreIndex(snapshot, biConsumer);
}
private void snapshotAndRestoreIndex(String snapshot, CheckedBiConsumer<String, String, Exception> biConsumer) throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final String repository = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
final int numberOfShards = randomIntBetween(1, 5);
final String repositoryType = repositoryType();
final Settings repositorySettings = repositorySettings();
registerRepository(repository, repositoryType, true, repositorySettings);
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen(indexName);
final int numDocs = randomIntBetween(1, 500);
final StringBuilder bulkBody = new StringBuilder();
for (int i = 0; i < numDocs; i++) {
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
bulkBody.append("{\"field\":").append(i).append(",\"text\":\"Document number ").append(i).append("\"}\n");
}
final Request documents = new Request(HttpPost.METHOD_NAME, '/' + indexName + "/_bulk");
documents.addParameter("refresh", Boolean.TRUE.toString());
documents.setJsonEntity(bulkBody.toString());
assertOK(client().performRequest(documents));
createSnapshot(repository, snapshot, true);
deleteIndex(indexName);
restoreSnapshot(repository, snapshot, true);
biConsumer.accept(repository, indexName);
}
private void assertRequestCountersAccountedForReads(RepositoryStatsSnapshot statsSnapshot) {
RepositoryStats repositoryStats = statsSnapshot.getRepositoryStats();
Map<String, Long> requestCounts = repositoryStats.requestCounts;
for (String readCounterKey : readCounterKeys()) {
assertThat(requestCounts.get(readCounterKey), is(notNullValue()));
assertThat(requestCounts.get(readCounterKey), is(greaterThan(0L)));
}
}
private void assertRequestCountersAccountedForWrites(RepositoryStatsSnapshot statsSnapshot) {
RepositoryStats repositoryStats = statsSnapshot.getRepositoryStats();
Map<String, Long> requestCounts = repositoryStats.requestCounts;
for (String writeCounterKey : writeCounterKeys()) {
assertThat(requestCounts.get(writeCounterKey), is(notNullValue()));
assertThat(requestCounts.get(writeCounterKey), is(greaterThan(0L)));
}
}
private void assertStatsAreEqualsIgnoringStoppedAt(RepositoryStatsSnapshot stats, RepositoryStatsSnapshot otherStats) {
assertRepositoryInfoIsEqualIgnoringStoppedAt(stats.getRepositoryInfo(), otherStats.getRepositoryInfo());
assertThat(stats.getRepositoryStats(), equalTo(otherStats.getRepositoryStats()));
}
private void assertRepositoryInfoIsEqualIgnoringStoppedAt(RepositoryInfo repositoryInfo, RepositoryInfo otherRepositoryInfo) {
assertThat(repositoryInfo.ephemeralId, equalTo(otherRepositoryInfo.ephemeralId));
assertThat(repositoryInfo.name, equalTo(otherRepositoryInfo.name));
assertThat(repositoryInfo.type, equalTo(otherRepositoryInfo.type));
assertThat(repositoryInfo.location, equalTo(otherRepositoryInfo.location));
assertThat(repositoryInfo.startedAt, equalTo(otherRepositoryInfo.startedAt));
}
private void assertRepositoryStatsBelongToRepository(RepositoryStatsSnapshot stats, String repositoryName) {
RepositoryInfo repositoryInfo = stats.getRepositoryInfo();
assertThat(repositoryInfo.name, equalTo(repositoryName));
assertThat(repositoryInfo.type, equalTo(repositoryType()));
assertThat(repositoryInfo.location, equalTo(repositoryLocation()));
}
private void assertAllRequestCountsAreZero(RepositoryStatsSnapshot statsSnapshot) {
RepositoryStats stats = statsSnapshot.getRepositoryStats();
for (long requestCount : stats.requestCounts.values()) {
assertThat(requestCount, equalTo(0));
}
}
private List<RepositoryStatsSnapshot> getRepositoriesStats() throws IOException {
Map<String, Object> response = getAsMap("/_nodes/_all/_repositories_metering");
return parseRepositoriesStatsResponse(response);
}
private List<RepositoryStatsSnapshot> parseRepositoriesStatsResponse(Map<String, Object> response) throws IOException {
Map<String, List<Map<String, Object>>> nodesRepoStats = extractValue(response, "nodes");
assertThat(response.size(), greaterThan(0));
List<RepositoryStatsSnapshot> repositoriesStats = new ArrayList<>();
for (String nodeId : getNodeIds()) {
List<Map<String, Object>> nodeStats = nodesRepoStats.get(nodeId);
assertThat(nodeStats, is(notNullValue()));
for (Map<String, Object> nodeStatSnapshot : nodeStats) {
RepositoryInfo repositoryInfo = parseRepositoryInfo(nodeStatSnapshot);
Map<String, Integer> intRequestCounters = extractValue(nodeStatSnapshot, "request_counts");
boolean archived = extractValue(nodeStatSnapshot, "archived");
int clusterVersion = (int) RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION;
if (archived) {
clusterVersion = extractValue(nodeStatSnapshot, "cluster_version");
}
Map<String, Long> requestCounters = intRequestCounters.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().longValue()));
RepositoryStats repositoryStats = new RepositoryStats(requestCounters);
RepositoryStatsSnapshot statsSnapshot = new RepositoryStatsSnapshot(
repositoryInfo,
repositoryStats,
clusterVersion,
archived
);
repositoriesStats.add(statsSnapshot);
}
}
return repositoriesStats;
}
private RepositoryInfo parseRepositoryInfo(Map<String, Object> nodeStatSnapshot) {
String id = extractValue(nodeStatSnapshot, "repository_ephemeral_id");
String name = extractValue(nodeStatSnapshot, "repository_name");
String type = extractValue(nodeStatSnapshot, "repository_type");
Map<String, String> location = extractValue(nodeStatSnapshot, "repository_location");
Long startedAt = extractValue(nodeStatSnapshot, "repository_started_at");
Long stoppedAt = extractValue(nodeStatSnapshot, "repository_stopped_at");
return new RepositoryInfo(id, name, type, location, startedAt, stoppedAt);
}
private Set<String> getNodeIds() throws IOException {
Map<String, Object> nodes = extractValue(getAsMap("_nodes/"), "nodes");
return nodes.keySet();
}
private List<RepositoryStatsSnapshot> clearRepositoriesStats(long maxVersionToClear) throws IOException {
final Request request = new Request(HttpDelete.METHOD_NAME, "/_nodes/_all/_repositories_metering/" + maxVersionToClear);
final Response response = client().performRequest(request);
assertThat(
"Failed to clear repositories stats: " + response,
response.getStatusLine().getStatusCode(),
equalTo(RestStatus.OK.getStatus())
);
return parseRepositoriesStatsResponse(responseAsMap(response));
}
@SuppressWarnings("unchecked")
protected static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);
}
protected String getProperty(String propertyName) {
final String property = System.getProperty(propertyName);
assertThat(property, not(blankOrNullString()));
return property;
}
}

View File

@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.repositories.metering.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.repositories.RepositoryInfo;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.repositories.RepositoryStatsSnapshot;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class RepositoriesMeteringResponseTests extends ESTestCase {
public void testSerializationRoundtrip() throws Exception {
final RepositoriesMeteringResponse repositoriesMeteringResponse = createResponse();
final RepositoriesMeteringResponse deserializedResponse = copyWriteable(
repositoriesMeteringResponse,
writableRegistry(),
RepositoriesMeteringResponse::new,
Version.CURRENT
);
assertResponsesAreEqual(repositoriesMeteringResponse, deserializedResponse);
}
private void assertResponsesAreEqual(RepositoriesMeteringResponse response, RepositoriesMeteringResponse otherResponse) {
List<RepositoriesNodeMeteringResponse> nodeResponses = response.getNodes();
List<RepositoriesNodeMeteringResponse> otherNodeResponses = otherResponse.getNodes();
assertThat(nodeResponses.size(), equalTo(otherNodeResponses.size()));
for (int i = 0; i < nodeResponses.size(); i++) {
RepositoriesNodeMeteringResponse nodeResponse = nodeResponses.get(i);
RepositoriesNodeMeteringResponse otherNodeResponse = otherNodeResponses.get(i);
assertThat(nodeResponse.repositoryStatsSnapshots, equalTo(otherNodeResponse.repositoryStatsSnapshots));
}
List<FailedNodeException> failures = response.failures();
List<FailedNodeException> otherFailures = otherResponse.failures();
assertThat(failures.size(), equalTo(otherFailures.size()));
for (int i = 0; i < failures.size(); i++) {
FailedNodeException failure = failures.get(i);
FailedNodeException otherFailure = otherFailures.get(i);
assertThat(failure.nodeId(), equalTo(otherFailure.nodeId()));
assertThat(failure.getMessage(), equalTo(otherFailure.getMessage()));
}
}
private RepositoriesMeteringResponse createResponse() {
ClusterName clusterName = new ClusterName("test");
int nodes = randomIntBetween(1, 10);
List<RepositoriesNodeMeteringResponse> nodeResponses = new ArrayList<>(nodes);
for (int nodeId = 0; nodeId < nodes; nodeId++) {
DiscoveryNode node = new DiscoveryNode("nodeId" + nodeId, buildNewFakeTransportAddress(), Version.CURRENT);
int numberOfRepos = randomInt(10);
List<RepositoryStatsSnapshot> nodeRepoStats = new ArrayList<>(numberOfRepos);
for (int clusterVersion = 0; clusterVersion < numberOfRepos; clusterVersion++) {
String repoId = randomAlphaOfLength(10);
String repoName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
String repoType = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Map<String, String> repoLocation = org.elasticsearch.common.collect.Map.of(
"bucket",
randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
);
long startedAt = System.currentTimeMillis() - 1;
Long stoppedAt = randomBoolean() ? System.currentTimeMillis() : null;
RepositoryInfo repositoryInfo = new RepositoryInfo(repoId, repoName, repoType, repoLocation, startedAt, stoppedAt);
boolean archived = randomBoolean();
RepositoryStatsSnapshot statsSnapshot = new RepositoryStatsSnapshot(
repositoryInfo,
new RepositoryStats(org.elasticsearch.common.collect.Map.of("GET", randomLongBetween(0, 2000))),
archived ? clusterVersion : RepositoryStatsSnapshot.UNKNOWN_CLUSTER_VERSION,
archived
);
nodeRepoStats.add(statsSnapshot);
}
nodeResponses.add(new RepositoriesNodeMeteringResponse(node, nodeRepoStats));
}
int numberOfFailures = randomInt(20);
List<FailedNodeException> failures = new ArrayList<>(numberOfFailures);
for (int i = nodes; i < numberOfFailures + nodes; i++) {
FailedNodeException failedNodeException = new FailedNodeException(
"nodeId" + i,
"error",
randomBoolean() ? new RuntimeException("boom") : null
);
failures.add(failedNodeException);
}
return new RepositoriesMeteringResponse(clusterName, nodeResponses, failures);
}
}

View File

@ -8,20 +8,14 @@ package org.elasticsearch.xpack.searchablesnapshots;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.QueryBuilder;
@ -31,7 +25,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@ -39,7 +32,6 @@ import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTestCase {
@ -250,26 +242,6 @@ public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTest
}
}
protected static void registerRepository(String repository, String type, boolean verify, Settings settings) throws IOException {
final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository);
request.setJsonEntity(Strings.toString(new PutRepositoryRequest(repository).type(type).verify(verify).settings(settings)));
final Response response = client().performRequest(request);
assertAcked("Failed to create repository [" + repository + "] of type [" + type + "]: " + response, response);
}
protected static void createSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException {
final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot);
request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion));
final Response response = client().performRequest(request);
assertThat(
"Failed to create snapshot [" + snapshot + "] in repository [" + repository + "]: " + response,
response.getStatusLine().getStatusCode(),
equalTo(RestStatus.OK.getStatus())
);
}
protected static void deleteSnapshot(String repository, String snapshot, boolean ignoreMissing) throws IOException {
final Request request = new Request(HttpDelete.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot);
try {
@ -406,19 +378,6 @@ public abstract class AbstractSearchableSnapshotsRestTestCase extends ESRestTest
return extractValue(responseAsMap(response), index + ".settings");
}
protected static Map<String, Object> responseAsMap(Response response) throws IOException {
final XContentType xContentType = XContentType.fromMediaTypeOrFormat(response.getEntity().getContentType().getValue());
assertThat("Unknown XContentType", xContentType, notNullValue());
BytesReference bytesReference = Streams.readFully(response.getEntity().getContent());
try (InputStream responseBody = bytesReference.streamInput()) {
return XContentHelper.convertToMap(xContentType.xContent(), responseBody, true);
} catch (Exception e) {
throw new IOException(bytesReference.utf8ToString(), e);
}
}
@SuppressWarnings("unchecked")
protected static <T> T extractValue(Map<String, Object> map, String path) {
return (T) XContentMapValues.extractValue(path, map);