[7.x] Data Stream Stats API (#58707) (#59566)

This API reports on statistics important for data streams, including the number of data
streams, the number of backing indices for those streams, the disk usage for each data
stream, and the maximum timestamp for each data stream
This commit is contained in:
James Baiera 2020-07-14 16:57:46 -04:00 committed by GitHub
parent ed2c29f102
commit 5f7e7e9410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1732 additions and 268 deletions

View File

@ -50,6 +50,8 @@ import org.elasticsearch.client.indices.ComposableIndexTemplateExistRequest;
import org.elasticsearch.client.indices.CreateDataStreamRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.DataStreamsStatsRequest;
import org.elasticsearch.client.indices.DataStreamsStatsResponse;
import org.elasticsearch.client.indices.DeleteAliasRequest;
import org.elasticsearch.client.indices.DeleteComposableIndexTemplateRequest;
import org.elasticsearch.client.indices.DeleteDataStreamRequest;
@ -260,6 +262,40 @@ public final class IndicesClient {
GetDataStreamResponse::fromXContent, listener, emptySet());
}
/**
* Gets statistics about one or more data streams using the Get Data Streams Stats API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html"> Data Streams API on
* elastic.co</a>
*
* @param dataStreamsStatsRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be
* customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public DataStreamsStatsResponse dataStreamsStats(DataStreamsStatsRequest dataStreamsStatsRequest, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(dataStreamsStatsRequest, IndicesRequestConverters::dataStreamsStats,
options, DataStreamsStatsResponse::fromXContent, emptySet());
}
/**
* Asynchronously gets statistics about one or more data streams using the Get Data Streams Stats API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-aliases.html"> Data Streams API on
* elastic.co</a>
*
* @param dataStreamsStatsRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be
* customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
*/
public Cancellable dataStreamsStatsAsync(DataStreamsStatsRequest dataStreamsStatsRequest, RequestOptions options,
ActionListener<DataStreamsStatsResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(dataStreamsStatsRequest, IndicesRequestConverters::dataStreamsStats,
options, DataStreamsStatsResponse::fromXContent, listener, emptySet());
}
/**
* Creates an index using the Create Index API.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html">

View File

@ -42,6 +42,7 @@ import org.elasticsearch.client.indices.AnalyzeRequest;
import org.elasticsearch.client.indices.CloseIndexRequest;
import org.elasticsearch.client.indices.CreateDataStreamRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.DataStreamsStatsRequest;
import org.elasticsearch.client.indices.GetDataStreamRequest;
import org.elasticsearch.client.indices.DeleteAliasRequest;
import org.elasticsearch.client.indices.DeleteComposableIndexTemplateRequest;
@ -96,6 +97,21 @@ final class IndicesRequestConverters {
return new Request(HttpGet.METHOD_NAME, endpoint);
}
static Request dataStreamsStats(DataStreamsStatsRequest dataStreamsStatsRequest) {
String[] expressions = dataStreamsStatsRequest.indices() == null ? Strings.EMPTY_ARRAY : dataStreamsStatsRequest.indices();
final String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_stream")
.addCommaSeparatedPathParts(expressions)
.addPathPartAsIs("_stats")
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withIndicesOptions(dataStreamsStatsRequest.indicesOptions());
request.addParameters(parameters.asMap());
return request;
}
static Request deleteIndex(DeleteIndexRequest deleteIndexRequest) {
String endpoint = RequestConverters.endpoint(deleteIndexRequest.indices());
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.indices;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Validatable;
public class DataStreamsStatsRequest implements Validatable {
private final String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, true);
public DataStreamsStatsRequest(String... indices) {
this.indices = indices;
}
public String[] indices() {
return indices;
}
public IndicesOptions indicesOptions() {
return indicesOptions;
}
public DataStreamsStatsRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = indicesOptions;
return this;
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.indices;
import org.elasticsearch.client.core.BroadcastResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
public class DataStreamsStatsResponse extends BroadcastResponse {
private final int dataStreamCount;
private final int backingIndices;
private final ByteSizeValue totalStoreSize;
private final Map<String, DataStreamStats> dataStreams;
protected DataStreamsStatsResponse(Shards shards, int dataStreamCount, int backingIndices, ByteSizeValue totalStoreSize,
Map<String, DataStreamStats> dataStreams) {
super(shards);
this.dataStreamCount = dataStreamCount;
this.backingIndices = backingIndices;
this.totalStoreSize = totalStoreSize;
this.dataStreams = dataStreams;
}
private static final ParseField DATA_STREAM_COUNT = new ParseField("data_stream_count");
private static final ParseField BACKING_INDICES = new ParseField("backing_indices");
private static final ParseField TOTAL_STORE_SIZE_BYTES = new ParseField("total_store_size_bytes");
private static final ParseField DATA_STREAMS = new ParseField("data_streams");
private static final ParseField DATA_STREAM = new ParseField("data_stream");
private static final ParseField STORE_SIZE_BYTES = new ParseField("store_size_bytes");
private static final ParseField MAXIMUM_TIMESTAMP = new ParseField("maximum_timestamp");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStreamsStatsResponse, Void> PARSER = new ConstructingObjectParser<>(
"data_streams_stats", true, arg -> {
Shards shards = (Shards) arg[0];
Integer dataStreamCount = ((Integer) arg[1]);
Integer backingIndices = ((Integer) arg[2]);
ByteSizeValue totalStoreSize = ((ByteSizeValue) arg[3]);
Map<String, DataStreamStats> dataStreams = new HashMap<>();
for (DataStreamStats dataStreamStats : ((List<DataStreamStats>) arg[4])) {
dataStreams.put(dataStreamStats.dataStream, dataStreamStats);
}
return new DataStreamsStatsResponse(shards, dataStreamCount, backingIndices, totalStoreSize, dataStreams);
});
private static final ConstructingObjectParser<DataStreamStats, Void> ENTRY_PARSER = new ConstructingObjectParser<>(
"data_streams_stats.entry", true, arg -> {
String dataStream = ((String) arg[0]);
Integer backingIndices = ((Integer) arg[1]);
ByteSizeValue storeSize = ((ByteSizeValue) arg[2]);
Long maximumTimestamp = ((Long) arg[3]);
return new DataStreamStats(dataStream, backingIndices, storeSize, maximumTimestamp);
});
static {
declareShardsField(PARSER);
PARSER.declareInt(constructorArg(), DATA_STREAM_COUNT);
PARSER.declareInt(constructorArg(), BACKING_INDICES);
PARSER.declareField(constructorArg(), (p, c) -> new ByteSizeValue(p.longValue()), TOTAL_STORE_SIZE_BYTES,
ObjectParser.ValueType.VALUE);
PARSER.declareObjectArray(constructorArg(), ENTRY_PARSER, DATA_STREAMS);
ENTRY_PARSER.declareString(constructorArg(), DATA_STREAM);
ENTRY_PARSER.declareInt(constructorArg(), BACKING_INDICES);
ENTRY_PARSER.declareField(constructorArg(), (p, c) -> new ByteSizeValue(p.longValue()), STORE_SIZE_BYTES,
ObjectParser.ValueType.VALUE);
ENTRY_PARSER.declareLong(constructorArg(), MAXIMUM_TIMESTAMP);
}
public static DataStreamsStatsResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.apply(parser, null);
}
public int getDataStreamCount() {
return dataStreamCount;
}
public int getBackingIndices() {
return backingIndices;
}
public ByteSizeValue getTotalStoreSize() {
return totalStoreSize;
}
public Map<String, DataStreamStats> getDataStreams() {
return dataStreams;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DataStreamsStatsResponse that = (DataStreamsStatsResponse) obj;
return dataStreamCount == that.dataStreamCount &&
backingIndices == that.backingIndices &&
Objects.equals(totalStoreSize, that.totalStoreSize) &&
Objects.equals(dataStreams, that.dataStreams);
}
@Override
public int hashCode() {
return Objects.hash(dataStreamCount, backingIndices, totalStoreSize, dataStreams);
}
@Override
public String toString() {
return "DataStreamsStatsResponse{" +
"dataStreamCount=" + dataStreamCount +
", backingIndices=" + backingIndices +
", totalStoreSize=" + totalStoreSize +
", dataStreams=" + dataStreams +
'}';
}
public static class DataStreamStats {
private final String dataStream;
private final int backingIndices;
private final ByteSizeValue storeSize;
private final long maximumTimestamp;
public DataStreamStats(String dataStream, int backingIndices, ByteSizeValue storeSize, long maximumTimestamp) {
this.dataStream = dataStream;
this.backingIndices = backingIndices;
this.storeSize = storeSize;
this.maximumTimestamp = maximumTimestamp;
}
public String getDataStream() {
return dataStream;
}
public int getBackingIndices() {
return backingIndices;
}
public ByteSizeValue getStoreSize() {
return storeSize;
}
public long getMaximumTimestamp() {
return maximumTimestamp;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DataStreamStats that = (DataStreamStats) obj;
return backingIndices == that.backingIndices &&
maximumTimestamp == that.maximumTimestamp &&
Objects.equals(dataStream, that.dataStream) &&
Objects.equals(storeSize, that.storeSize);
}
@Override
public int hashCode() {
return Objects.hash(dataStream, backingIndices, storeSize, maximumTimestamp);
}
@Override
public String toString() {
return "DataStreamStats{" +
"dataStream='" + dataStream + '\'' +
", backingIndices=" + backingIndices +
", storeSize=" + storeSize +
", maximumTimestamp=" + maximumTimestamp +
'}';
}
}
}

View File

@ -64,6 +64,9 @@ import org.elasticsearch.client.indices.CreateDataStreamRequest;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.DataStream;
import org.elasticsearch.client.indices.DataStreamsStatsRequest;
import org.elasticsearch.client.indices.DataStreamsStatsResponse;
import org.elasticsearch.client.indices.DataStreamsStatsResponse.DataStreamStats;
import org.elasticsearch.client.indices.DeleteAliasRequest;
import org.elasticsearch.client.indices.DeleteComposableIndexTemplateRequest;
import org.elasticsearch.client.indices.DeleteDataStreamRequest;
@ -2069,6 +2072,26 @@ public class IndicesClientIT extends ESRestHighLevelClientTestCase {
assertThat(dataStream.getTimeStampField(), equalTo("@timestamp"));
assertThat(dataStream.getIndices(), hasSize(1));
DataStreamsStatsRequest dataStreamsStatsRequest = new DataStreamsStatsRequest();
DataStreamsStatsResponse dataStreamsStatsResponse = execute(dataStreamsStatsRequest, indices::dataStreamsStats,
indices::dataStreamsStatsAsync);
int dataStreamsCount = dataStreamsStatsResponse.getDataStreamCount();
assertThat(dataStreamsCount, equalTo(1));
int backingIndices = dataStreamsStatsResponse.getBackingIndices();
assertThat(backingIndices, equalTo(1));
ByteSizeValue byteSizeValue = dataStreamsStatsResponse.getTotalStoreSize();
assertThat(byteSizeValue, notNullValue());
assertThat(byteSizeValue.getBytes(), not(equalTo(0L)));
Map<String, DataStreamStats> dataStreamsStats = dataStreamsStatsResponse.getDataStreams();
assertThat(dataStreamsStats, notNullValue());
assertThat(dataStreamsStats.size(), equalTo(1));
DataStreamStats dataStreamStat = dataStreamsStats.get(dataStreamName);
assertThat(dataStreamStat, notNullValue());
assertThat(dataStreamStat.getDataStream(), equalTo(dataStreamName));
assertThat(dataStreamStat.getBackingIndices(), equalTo(1));
assertThat(dataStreamStat.getMaximumTimestamp(), equalTo(0L)); // No data in here
assertThat(dataStreamStat.getStoreSize().getBytes(), not(equalTo(0L))); // but still takes up some space on disk
DeleteDataStreamRequest deleteDataStreamRequest = new DeleteDataStreamRequest(dataStreamName);
response = execute(deleteDataStreamRequest, indices::deleteDataStream, indices::deleteDataStreamAsync);
assertThat(response.isAcknowledged(), equalTo(true));

View File

@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.indices;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
public class DataStreamsStatsResponseTests extends AbstractResponseTestCase<DataStreamsStatsAction.Response, DataStreamsStatsResponse> {
private static long randomRecentTimestamp() {
long base = System.currentTimeMillis();
return randomLongBetween(base - TimeUnit.HOURS.toMillis(1), base);
}
@Override
protected DataStreamsStatsAction.Response createServerTestInstance(XContentType xContentType) {
int dataStreamCount = randomInt(10);
int backingIndicesTotal = 0;
long totalStoreSize = 0L;
ArrayList<DataStreamsStatsAction.DataStreamStats> dataStreamStats = new ArrayList<>();
for (int i = 0; i < dataStreamCount; i++) {
String dataStreamName = randomAlphaOfLength(8).toLowerCase(Locale.getDefault());
int backingIndices = randomInt(5);
backingIndicesTotal += backingIndices;
long storeSize = randomLongBetween(250, 1000000000);
totalStoreSize += storeSize;
long maximumTimestamp = randomRecentTimestamp();
dataStreamStats.add(new DataStreamsStatsAction.DataStreamStats(dataStreamName, backingIndices,
new ByteSizeValue(storeSize), maximumTimestamp));
}
int totalShards = randomIntBetween(backingIndicesTotal, backingIndicesTotal * 3);
int successfulShards = randomInt(totalShards);
int failedShards = totalShards - successfulShards;
List<DefaultShardOperationFailedException> exceptions = new ArrayList<>();
for (int i = 0; i < failedShards; i++) {
exceptions.add(new DefaultShardOperationFailedException(randomAlphaOfLength(8).toLowerCase(Locale.getDefault()),
randomInt(totalShards), new ElasticsearchException("boom")));
}
return new DataStreamsStatsAction.Response(totalShards, successfulShards, failedShards, exceptions,
dataStreamCount, backingIndicesTotal, new ByteSizeValue(totalStoreSize),
dataStreamStats.toArray(new DataStreamsStatsAction.DataStreamStats[0]));
}
@Override
protected DataStreamsStatsResponse doParseToClientInstance(XContentParser parser) throws IOException {
return DataStreamsStatsResponse.fromXContent(parser);
}
@Override
protected void assertInstances(DataStreamsStatsAction.Response serverTestInstance, DataStreamsStatsResponse clientInstance) {
assertEquals(serverTestInstance.getTotalShards(), clientInstance.shards().total());
assertEquals(serverTestInstance.getSuccessfulShards(), clientInstance.shards().successful());
assertEquals(serverTestInstance.getFailedShards(), clientInstance.shards().failed());
assertEquals(serverTestInstance.getShardFailures().length, clientInstance.shards().failures().size());
assertEquals(serverTestInstance.getDataStreamCount(), clientInstance.getDataStreamCount());
assertEquals(serverTestInstance.getBackingIndices(), clientInstance.getBackingIndices());
assertEquals(serverTestInstance.getTotalStoreSize(), clientInstance.getTotalStoreSize());
assertEquals(serverTestInstance.getDataStreams().length, clientInstance.getDataStreams().size());
for (DataStreamsStatsAction.DataStreamStats serverStats : serverTestInstance.getDataStreams()) {
DataStreamsStatsResponse.DataStreamStats clientStats = clientInstance.getDataStreams()
.get(serverStats.getDataStream());
assertEquals(serverStats.getDataStream(), clientStats.getDataStream());
assertEquals(serverStats.getBackingIndices(), clientStats.getBackingIndices());
assertEquals(serverStats.getStoreSize(), clientStats.getStoreSize());
assertEquals(serverStats.getMaximumTimestamp(), clientStats.getMaximumTimestamp());
}
}
}

View File

@ -0,0 +1,50 @@
{
"indices.data_streams_stats":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html",
"description":"Provides statistics on operations happening in a data stream."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_data_stream/_stats",
"methods":[
"GET"
]
},
{
"path":"/_data_stream/{name}/_stats",
"methods":[
"GET"
],
"parts":{
"name":{
"type":"list",
"description":"A comma-separated list of data stream names; use `_all` or empty string to perform the operation on all data streams"
}
}
}
]
},
"params":{
"expand_wildcards":{
"type":"enum",
"options":[
"open",
"closed",
"hidden",
"none",
"all"
],
"default":"open",
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"forbid_closed_indices":{
"type":"boolean",
"description":"If set to false stats will also collected from closed indices if explicitly specified or if expand_wildcards expands to closed indices",
"default":true
}
}
}
}

View File

@ -118,6 +118,7 @@ import org.elasticsearch.action.admin.indices.dangling.list.TransportListDanglin
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsAction;
@ -310,6 +311,7 @@ import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction;
import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestCreateDataStreamAction;
import org.elasticsearch.rest.action.admin.indices.RestCreateIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestDataStreamsStatsAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteComponentTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteComposableIndexTemplateAction;
import org.elasticsearch.rest.action.admin.indices.RestDeleteDataStreamAction;
@ -630,6 +632,7 @@ public class ActionModule extends AbstractModule {
actions.register(DeleteDataStreamAction.INSTANCE, DeleteDataStreamAction.TransportAction.class);
actions.register(GetDataStreamAction.INSTANCE, GetDataStreamAction.TransportAction.class);
actions.register(ResolveIndexAction.INSTANCE, ResolveIndexAction.TransportAction.class);
actions.register(DataStreamsStatsAction.INSTANCE, DataStreamsStatsAction.TransportAction.class);
}
// Persistent tasks:
@ -795,6 +798,7 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestDeleteDataStreamAction());
registerHandler.accept(new RestGetDataStreamsAction());
registerHandler.accept(new RestResolveIndexAction());
registerHandler.accept(new RestDataStreamsStatsAction());
}
// CAT API

View File

@ -0,0 +1,441 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.PointValues;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexAbstractionResolver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Stream;
public class DataStreamsStatsAction extends ActionType<DataStreamsStatsAction.Response> {
public static final DataStreamsStatsAction INSTANCE = new DataStreamsStatsAction();
public static final String NAME = "indices:monitor/data_stream/stats";
public DataStreamsStatsAction() {
super(NAME, DataStreamsStatsAction.Response::new);
}
public static class Request extends BroadcastRequest<Request> {
public Request() {
super((String[]) null);
}
public Request(StreamInput in) throws IOException {
super(in);
}
}
public static class Response extends BroadcastResponse {
private final int dataStreamCount;
private final int backingIndices;
private final ByteSizeValue totalStoreSize;
private final DataStreamStats[] dataStreams;
public Response(int totalShards, int successfulShards, int failedShards, List<DefaultShardOperationFailedException> shardFailures,
int dataStreamCount, int backingIndices, ByteSizeValue totalStoreSize, DataStreamStats[] dataStreams) {
super(totalShards, successfulShards, failedShards, shardFailures);
this.dataStreamCount = dataStreamCount;
this.backingIndices = backingIndices;
this.totalStoreSize = totalStoreSize;
this.dataStreams = dataStreams;
}
public Response(StreamInput in) throws IOException {
super(in);
this.dataStreamCount = in.readVInt();
this.backingIndices = in.readVInt();
this.totalStoreSize = new ByteSizeValue(in);
this.dataStreams = in.readArray(DataStreamStats::new, DataStreamStats[]::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(dataStreamCount);
out.writeVInt(backingIndices);
totalStoreSize.writeTo(out);
out.writeArray(dataStreams);
}
@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
builder.field("data_stream_count", dataStreamCount);
builder.field("backing_indices", backingIndices);
builder.humanReadableField("total_store_size_bytes", "total_store_size", totalStoreSize);
builder.array("data_streams", (Object[]) dataStreams);
}
public int getDataStreamCount() {
return dataStreamCount;
}
public int getBackingIndices() {
return backingIndices;
}
public ByteSizeValue getTotalStoreSize() {
return totalStoreSize;
}
public DataStreamStats[] getDataStreams() {
return dataStreams;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Response response = (Response) obj;
return dataStreamCount == response.dataStreamCount &&
backingIndices == response.backingIndices &&
Objects.equals(totalStoreSize, response.totalStoreSize) &&
Arrays.equals(dataStreams, response.dataStreams);
}
@Override
public int hashCode() {
int result = Objects.hash(dataStreamCount, backingIndices, totalStoreSize);
result = 31 * result + Arrays.hashCode(dataStreams);
return result;
}
@Override
public String toString() {
return "Response{" +
"dataStreamCount=" + dataStreamCount +
", backingIndices=" + backingIndices +
", totalStoreSize=" + totalStoreSize +
", dataStreams=" + Arrays.toString(dataStreams) +
'}';
}
}
public static class DataStreamStats implements ToXContentObject, Writeable {
private final String dataStream;
private final int backingIndices;
private final ByteSizeValue storeSize;
private final long maximumTimestamp;
public DataStreamStats(String dataStream, int backingIndices, ByteSizeValue storeSize, long maximumTimestamp) {
this.dataStream = dataStream;
this.backingIndices = backingIndices;
this.storeSize = storeSize;
this.maximumTimestamp = maximumTimestamp;
}
public DataStreamStats(StreamInput in) throws IOException {
this.dataStream = in.readString();
this.backingIndices = in.readVInt();
this.storeSize = new ByteSizeValue(in);
this.maximumTimestamp = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(dataStream);
out.writeVInt(backingIndices);
storeSize.writeTo(out);
out.writeVLong(maximumTimestamp);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("data_stream", dataStream);
builder.field("backing_indices", backingIndices);
builder.humanReadableField("store_size_bytes", "store_size", storeSize);
builder.field("maximum_timestamp", maximumTimestamp);
builder.endObject();
return builder;
}
public String getDataStream() {
return dataStream;
}
public int getBackingIndices() {
return backingIndices;
}
public ByteSizeValue getStoreSize() {
return storeSize;
}
public long getMaximumTimestamp() {
return maximumTimestamp;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DataStreamStats that = (DataStreamStats) obj;
return backingIndices == that.backingIndices &&
maximumTimestamp == that.maximumTimestamp &&
Objects.equals(dataStream, that.dataStream) &&
Objects.equals(storeSize, that.storeSize);
}
@Override
public int hashCode() {
return Objects.hash(dataStream, backingIndices, storeSize, maximumTimestamp);
}
@Override
public String toString() {
return "DataStreamStats{" +
"dataStream='" + dataStream + '\'' +
", backingIndices=" + backingIndices +
", storeSize=" + storeSize +
", maximumTimestamp=" + maximumTimestamp +
'}';
}
}
public static class DataStreamShardStats implements Writeable {
private final ShardRouting shardRouting;
private final StoreStats storeStats;
private final long maxTimestamp;
public DataStreamShardStats(ShardRouting shardRouting, StoreStats storeStats, long maxTimestamp) {
this.shardRouting = shardRouting;
this.storeStats = storeStats;
this.maxTimestamp = maxTimestamp;
}
public DataStreamShardStats(StreamInput in) throws IOException {
this.shardRouting = new ShardRouting(in);
this.storeStats = new StoreStats(in);
this.maxTimestamp = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
storeStats.writeTo(out);
out.writeVLong(maxTimestamp);
}
public ShardRouting getShardRouting() {
return shardRouting;
}
public StoreStats getStoreStats() {
return storeStats;
}
public long getMaxTimestamp() {
return maxTimestamp;
}
}
private static class AggregatedStats {
Set<String> backingIndices = new HashSet<>();
long storageBytes = 0L;
long maxTimestamp = 0L;
}
public static class TransportAction extends TransportBroadcastByNodeAction<Request, Response, DataStreamShardStats> {
private final ClusterService clusterService;
private final IndicesService indicesService;
private final IndexAbstractionResolver indexAbstractionResolver;
@Inject
public TransportAction(ClusterService clusterService, TransportService transportService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(DataStreamsStatsAction.NAME, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, ThreadPool.Names.MANAGEMENT);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.indexAbstractionResolver = new IndexAbstractionResolver(indexNameExpressionResolver);
}
@Override
protected Request readRequestFrom(StreamInput in) throws IOException {
return new Request(in);
}
@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, Request request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
}
@Override
protected ShardsIterator shards(ClusterState clusterState, Request request, String[] concreteIndices) {
String[] requestIndices = request.indices();
if (requestIndices == null || requestIndices.length == 0) {
requestIndices = new String[]{"*"};
}
List<String> abstractionNames = indexAbstractionResolver.resolveIndexAbstractions(requestIndices, request.indicesOptions(),
clusterState.getMetadata(), true); // Always include data streams for data streams stats api
SortedMap<String, IndexAbstraction> indicesLookup = clusterState.getMetadata().getIndicesLookup();
String[] concreteDatastreamIndices = abstractionNames.stream().flatMap(abstractionName -> {
IndexAbstraction indexAbstraction = indicesLookup.get(abstractionName);
assert indexAbstraction != null;
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) indexAbstraction;
List<IndexMetadata> indices = dataStream.getIndices();
return indices.stream().map(idx -> idx.getIndex().getName());
} else {
return Stream.empty();
}
}).toArray(String[]::new);
return clusterState.getRoutingTable().allShards(concreteDatastreamIndices);
}
@Override
protected DataStreamShardStats shardOperation(Request request, ShardRouting shardRouting) throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
// if we don't have the routing entry yet, we need it stats wise, we treat it as if the shard is not ready yet
if (indexShard.routingEntry() == null) {
throw new ShardNotFoundException(indexShard.shardId());
}
StoreStats storeStats = indexShard.storeStats();
IndexAbstraction indexAbstraction = clusterService.state().getMetadata().getIndicesLookup().get(shardRouting.getIndexName());
assert indexAbstraction != null;
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
assert dataStream != null;
long maxTimestamp = 0L;
try (Engine.Searcher searcher = indexShard.acquireSearcher("data_stream_stats")) {
IndexReader indexReader = searcher.getIndexReader();
String fieldName = dataStream.getDataStream().getTimeStampField().getName();
byte[] maxPackedValue = PointValues.getMaxPackedValue(indexReader, fieldName);
if (maxPackedValue != null) {
maxTimestamp = LongPoint.decodeDimension(maxPackedValue, 0);
}
}
return new DataStreamShardStats(
indexShard.routingEntry(),
storeStats,
maxTimestamp
);
}
@Override
protected DataStreamShardStats readShardResult(StreamInput in) throws IOException {
return new DataStreamShardStats(in);
}
@Override
protected Response newResponse(Request request, int totalShards, int successfulShards,
int failedShards, List<DataStreamShardStats> dataStreamShardStats,
List<DefaultShardOperationFailedException> shardFailures,
ClusterState clusterState) {
Map<String, AggregatedStats> aggregatedDataStreamsStats = new HashMap<>();
Set<String> allBackingIndices = new HashSet<>();
long totalStoreSizeBytes = 0L;
SortedMap<String, IndexAbstraction> indicesLookup = clusterState.getMetadata().getIndicesLookup();
for (DataStreamShardStats shardStat : dataStreamShardStats) {
String indexName = shardStat.getShardRouting().getIndexName();
IndexAbstraction indexAbstraction = indicesLookup.get(indexName);
IndexAbstraction.DataStream dataStream = indexAbstraction.getParentDataStream();
assert dataStream != null;
// Aggregate global stats
totalStoreSizeBytes += shardStat.getStoreStats().sizeInBytes();
allBackingIndices.add(indexName);
// Aggregate data stream stats
AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats());
stats.storageBytes += shardStat.getStoreStats().sizeInBytes();
stats.maxTimestamp = Math.max(stats.maxTimestamp, shardStat.getMaxTimestamp());
stats.backingIndices.add(indexName);
}
DataStreamStats[] dataStreamStats = aggregatedDataStreamsStats.entrySet().stream()
.map(entry -> new DataStreamStats(
entry.getKey(),
entry.getValue().backingIndices.size(),
new ByteSizeValue(entry.getValue().storageBytes),
entry.getValue().maxTimestamp))
.toArray(DataStreamStats[]::new);
return new Response(
totalShards,
successfulShards,
failedShards,
shardFailures,
aggregatedDataStreamsStats.size(),
allBackingIndices.size(),
new ByteSizeValue(totalStoreSizeBytes),
dataStreamStats
);
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexAbstractionResolver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
@ -43,11 +44,9 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
@ -59,11 +58,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.Spliterators;
import java.util.TreeMap;
@ -464,6 +461,7 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
private final ClusterService clusterService;
private final RemoteClusterService remoteClusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final IndexAbstractionResolver indexAbstractionResolver;
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
@ -473,6 +471,7 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexAbstractionResolver = new IndexAbstractionResolver(indexNameExpressionResolver);
}
@Override
@ -486,8 +485,8 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
List<ResolvedAlias> aliases = new ArrayList<>();
List<ResolvedDataStream> dataStreams = new ArrayList<>();
if (localIndices != null) {
resolveIndices(localIndices.indices(), request.indicesOptions, metadata, indexNameExpressionResolver, indices, aliases,
dataStreams);
resolveIndices(localIndices.indices(), request.indicesOptions, metadata, indexAbstractionResolver, indices, aliases,
dataStreams, request.includeDataStreams());
}
if (remoteClusterIndices.size() > 0) {
@ -529,9 +528,11 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
* @param dataStreams List containing any matching data streams
*/
// visible for testing
static void resolveIndices(String[] names, IndicesOptions indicesOptions, Metadata metadata, IndexNameExpressionResolver resolver,
List<ResolvedIndex> indices, List<ResolvedAlias> aliases, List<ResolvedDataStream> dataStreams) {
List<String> resolvedIndexAbstractions = resolveIndexAbstractions(names, indicesOptions, metadata, resolver);
static void resolveIndices(String[] names, IndicesOptions indicesOptions, Metadata metadata, IndexAbstractionResolver resolver,
List<ResolvedIndex> indices, List<ResolvedAlias> aliases, List<ResolvedDataStream> dataStreams,
boolean includeDataStreams) {
List<String> resolvedIndexAbstractions = resolver.resolveIndexAbstractions(names, indicesOptions, metadata,
includeDataStreams);
SortedMap<String, IndexAbstraction> lookup = metadata.getIndicesLookup();
for (String s : resolvedIndexAbstractions) {
enrichIndexAbstraction(s, lookup, indices, aliases, dataStreams);
@ -559,126 +560,6 @@ public class ResolveIndexAction extends ActionType<ResolveIndexAction.Response>
}
}
private static List<String> resolveIndexAbstractions(String[] indices, IndicesOptions indicesOptions, Metadata metadata,
IndexNameExpressionResolver indexNameExpressionResolver) {
final boolean replaceWildcards = indicesOptions.expandWildcardsOpen() || indicesOptions.expandWildcardsClosed();
Set<String> availableIndexAbstractions = metadata.getIndicesLookup().keySet();
List<String> finalIndices = new ArrayList<>();
boolean wildcardSeen = false;
for (String index : indices) {
String indexAbstraction;
boolean minus = false;
if (index.charAt(0) == '-' && wildcardSeen) {
indexAbstraction = index.substring(1);
minus = true;
} else {
indexAbstraction = index;
}
// we always need to check for date math expressions
final String dateMathName = indexNameExpressionResolver.resolveDateMathExpression(indexAbstraction);
if (dateMathName != indexAbstraction) {
assert dateMathName.equals(indexAbstraction) == false;
if (replaceWildcards && Regex.isSimpleMatchPattern(dateMathName)) {
// continue
indexAbstraction = dateMathName;
} else if (availableIndexAbstractions.contains(dateMathName) &&
isIndexVisible(indexAbstraction, dateMathName, indicesOptions, metadata, true)) {
if (minus) {
finalIndices.remove(dateMathName);
} else {
finalIndices.add(dateMathName);
}
} else {
if (indicesOptions.ignoreUnavailable() == false) {
throw new IndexNotFoundException(dateMathName);
}
}
}
if (replaceWildcards && Regex.isSimpleMatchPattern(indexAbstraction)) {
wildcardSeen = true;
Set<String> resolvedIndices = new HashSet<>();
for (String authorizedIndex : availableIndexAbstractions) {
if (Regex.simpleMatch(indexAbstraction, authorizedIndex) &&
isIndexVisible(indexAbstraction, authorizedIndex, indicesOptions, metadata)) {
resolvedIndices.add(authorizedIndex);
}
}
if (resolvedIndices.isEmpty()) {
//es core honours allow_no_indices for each wildcard expression, we do the same here by throwing index not found.
if (indicesOptions.allowNoIndices() == false) {
throw new IndexNotFoundException(indexAbstraction);
}
} else {
if (minus) {
finalIndices.removeAll(resolvedIndices);
} else {
finalIndices.addAll(resolvedIndices);
}
}
} else if (dateMathName.equals(indexAbstraction)) {
if (minus) {
finalIndices.remove(indexAbstraction);
} else {
finalIndices.add(indexAbstraction);
}
}
}
return finalIndices;
}
private static boolean isIndexVisible(String expression, String index, IndicesOptions indicesOptions, Metadata metadata) {
return isIndexVisible(expression, index, indicesOptions, metadata, false);
}
private static boolean isIndexVisible(String expression, String index, IndicesOptions indicesOptions, Metadata metadata,
boolean dateMathExpression) {
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index);
final boolean isHidden = indexAbstraction.isHidden();
if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) {
//it's an alias, ignore expandWildcardsOpen and expandWildcardsClosed.
//complicated to support those options with aliases pointing to multiple indices...
if (indicesOptions.ignoreAliases()) {
return false;
} else if (isHidden == false || indicesOptions.expandWildcardsHidden() || isVisibleDueToImplicitHidden(expression, index)) {
return true;
} else {
return false;
}
}
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
// If indicesOptions.includeDataStreams() returns false then we fail later in IndexNameExpressionResolver.
if (isHidden == false || indicesOptions.expandWildcardsHidden()) {
return true;
} else {
return false;
}
}
assert indexAbstraction.getIndices().size() == 1 : "concrete index must point to a single index";
IndexMetadata indexMetadata = indexAbstraction.getIndices().get(0);
if (isHidden && indicesOptions.expandWildcardsHidden() == false && isVisibleDueToImplicitHidden(expression, index) == false) {
return false;
}
// the index is not hidden and since it is a date math expression, we consider it visible regardless of open/closed
if (dateMathExpression) {
assert IndexMetadata.State.values().length == 2 : "a new IndexMetadata.State value may need to be handled!";
return true;
}
if (indexMetadata.getState() == IndexMetadata.State.CLOSE && indicesOptions.expandWildcardsClosed()) {
return true;
}
if (indexMetadata.getState() == IndexMetadata.State.OPEN && indicesOptions.expandWildcardsOpen()) {
return true;
}
return false;
}
private static boolean isVisibleDueToImplicitHidden(String expression, String index) {
return index.startsWith(".") && expression.startsWith(".") && Regex.isSimpleMatchPattern(expression);
}
private static void enrichIndexAbstraction(String indexAbstraction, SortedMap<String, IndexAbstraction> lookup,
List<ResolvedIndex> indices, List<ResolvedAlias> aliases,
List<ResolvedDataStream> dataStreams) {

View File

@ -0,0 +1,171 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.index.IndexNotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class IndexAbstractionResolver {
private final IndexNameExpressionResolver indexNameExpressionResolver;
public IndexAbstractionResolver(IndexNameExpressionResolver indexNameExpressionResolver) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
public List<String> resolveIndexAbstractions(String[] indices, IndicesOptions indicesOptions, Metadata metadata,
boolean includeDataStreams) {
return resolveIndexAbstractions(Arrays.asList(indices), indicesOptions, metadata, includeDataStreams);
}
public List<String> resolveIndexAbstractions(Iterable<String> indices, IndicesOptions indicesOptions, Metadata metadata,
boolean includeDataStreams) {
final boolean replaceWildcards = indicesOptions.expandWildcardsOpen() || indicesOptions.expandWildcardsClosed();
Set<String> availableIndexAbstractions = metadata.getIndicesLookup().keySet();
return resolveIndexAbstractions(indices, indicesOptions, metadata, availableIndexAbstractions, replaceWildcards,
includeDataStreams);
}
public List<String> resolveIndexAbstractions(Iterable<String> indices, IndicesOptions indicesOptions, Metadata metadata,
Collection<String> availableIndexAbstractions, boolean replaceWildcards,
boolean includeDataStreams) {
List<String> finalIndices = new ArrayList<>();
boolean wildcardSeen = false;
for (String index : indices) {
String indexAbstraction;
boolean minus = false;
if (index.charAt(0) == '-' && wildcardSeen) {
indexAbstraction = index.substring(1);
minus = true;
} else {
indexAbstraction = index;
}
// we always need to check for date math expressions
final String dateMathName = indexNameExpressionResolver.resolveDateMathExpression(indexAbstraction);
if (dateMathName != indexAbstraction) {
assert dateMathName.equals(indexAbstraction) == false;
if (replaceWildcards && Regex.isSimpleMatchPattern(dateMathName)) {
// continue
indexAbstraction = dateMathName;
} else if (availableIndexAbstractions.contains(dateMathName) &&
isIndexVisible(indexAbstraction, dateMathName, indicesOptions, metadata, includeDataStreams, true)) {
if (minus) {
finalIndices.remove(dateMathName);
} else {
finalIndices.add(dateMathName);
}
} else {
if (indicesOptions.ignoreUnavailable() == false) {
throw new IndexNotFoundException(dateMathName);
}
}
}
if (replaceWildcards && Regex.isSimpleMatchPattern(indexAbstraction)) {
wildcardSeen = true;
Set<String> resolvedIndices = new HashSet<>();
for (String authorizedIndex : availableIndexAbstractions) {
if (Regex.simpleMatch(indexAbstraction, authorizedIndex) &&
isIndexVisible(indexAbstraction, authorizedIndex, indicesOptions, metadata, includeDataStreams)) {
resolvedIndices.add(authorizedIndex);
}
}
if (resolvedIndices.isEmpty()) {
//es core honours allow_no_indices for each wildcard expression, we do the same here by throwing index not found.
if (indicesOptions.allowNoIndices() == false) {
throw new IndexNotFoundException(indexAbstraction);
}
} else {
if (minus) {
finalIndices.removeAll(resolvedIndices);
} else {
finalIndices.addAll(resolvedIndices);
}
}
} else if (dateMathName.equals(indexAbstraction)) {
if (minus) {
finalIndices.remove(indexAbstraction);
} else {
finalIndices.add(indexAbstraction);
}
}
}
return finalIndices;
}
public static boolean isIndexVisible(String expression, String index, IndicesOptions indicesOptions, Metadata metadata,
boolean includeDataStreams) {
return isIndexVisible(expression, index, indicesOptions, metadata, includeDataStreams, false);
}
public static boolean isIndexVisible(String expression, String index, IndicesOptions indicesOptions, Metadata metadata,
boolean includeDataStreams, boolean dateMathExpression) {
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index);
if (indexAbstraction == null) {
throw new IllegalStateException("could not resolve index abstraction [" + index + "]");
}
final boolean isHidden = indexAbstraction.isHidden();
if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) {
//it's an alias, ignore expandWildcardsOpen and expandWildcardsClosed.
//complicated to support those options with aliases pointing to multiple indices...
if (indicesOptions.ignoreAliases()) {
return false;
} else if (isHidden == false || indicesOptions.expandWildcardsHidden() || isVisibleDueToImplicitHidden(expression, index)) {
return true;
} else {
return false;
}
}
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
return includeDataStreams;
}
assert indexAbstraction.getIndices().size() == 1 : "concrete index must point to a single index";
IndexMetadata indexMetadata = indexAbstraction.getIndices().get(0);
if (isHidden && indicesOptions.expandWildcardsHidden() == false && isVisibleDueToImplicitHidden(expression, index) == false) {
return false;
}
// the index is not hidden and since it is a date math expression, we consider it visible regardless of open/closed
if (dateMathExpression) {
assert IndexMetadata.State.values().length == 2 : "a new IndexMetadata.State value may need to be handled!";
return true;
}
if (indexMetadata.getState() == IndexMetadata.State.CLOSE && indicesOptions.expandWildcardsClosed()) {
return true;
}
if (indexMetadata.getState() == IndexMetadata.State.OPEN && indicesOptions.expandWildcardsOpen()) {
return true;
}
return false;
}
private static boolean isVisibleDueToImplicitHidden(String expression, String index) {
return index.startsWith(".") && expression.startsWith(".") && Regex.isSimpleMatchPattern(expression);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public class RestDataStreamsStatsAction extends BaseRestHandler {
@Override
public String getName() {
return "data_stream_stats_action";
}
@Override
public List<Route> routes() {
return Arrays.asList(
new Route(RestRequest.Method.GET, "/_data_stream/_stats"),
new Route(RestRequest.Method.GET, "/_data_stream/{name}/_stats")
);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
DataStreamsStatsAction.Request dataStreamsStatsRequest = new DataStreamsStatsAction.Request();
boolean forbidClosedIndices = request.paramAsBoolean("forbid_closed_indices", true);
IndicesOptions defaultIndicesOption = forbidClosedIndices ? dataStreamsStatsRequest.indicesOptions()
: IndicesOptions.strictExpandOpen();
assert dataStreamsStatsRequest.indicesOptions() == IndicesOptions.strictExpandOpenAndForbidClosed() : "DataStreamStats default " +
"indices options changed";
dataStreamsStatsRequest.indicesOptions(IndicesOptions.fromRequest(request, defaultIndicesOption));
dataStreamsStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("name")));
return channel -> client.execute(DataStreamsStatsAction.INSTANCE, dataStreamsStatsRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
public class DataStreamsStatsResponseTests extends AbstractWireSerializingTestCase<DataStreamsStatsAction.Response> {
@Override
protected Writeable.Reader<DataStreamsStatsAction.Response> instanceReader() {
return DataStreamsStatsAction.Response::new;
}
@Override
protected DataStreamsStatsAction.Response createTestInstance() {
return randomStatsResponse();
}
public static DataStreamsStatsAction.Response randomStatsResponse() {
int dataStreamCount = randomInt(10);
int backingIndicesTotal = 0;
long totalStoreSize = 0L;
ArrayList<DataStreamsStatsAction.DataStreamStats> dataStreamStats = new ArrayList<>();
for (int i = 0; i < dataStreamCount; i++) {
String dataStreamName = randomAlphaOfLength(8).toLowerCase(Locale.getDefault());
int backingIndices = randomInt(5);
backingIndicesTotal += backingIndices;
long storeSize = randomLongBetween(250, 1000000000);
totalStoreSize += storeSize;
long maximumTimestamp = randomRecentTimestamp();
dataStreamStats.add(new DataStreamsStatsAction.DataStreamStats(dataStreamName, backingIndices,
new ByteSizeValue(storeSize), maximumTimestamp));
}
int totalShards = randomIntBetween(backingIndicesTotal, backingIndicesTotal * 3);
int successfulShards = randomInt(totalShards);
int failedShards = totalShards - successfulShards;
List<DefaultShardOperationFailedException> exceptions = new ArrayList<>();
for (int i = 0; i < failedShards; i++) {
exceptions.add(new DefaultShardOperationFailedException(randomAlphaOfLength(8).toLowerCase(Locale.getDefault()),
randomInt(totalShards), new ElasticsearchException("boom")));
}
return new DataStreamsStatsAction.Response(totalShards, successfulShards, failedShards, exceptions,
dataStreamCount, backingIndicesTotal, new ByteSizeValue(totalStoreSize),
dataStreamStats.toArray(new DataStreamsStatsAction.DataStreamStats[0]));
}
private static long randomRecentTimestamp() {
long base = System.currentTimeMillis();
return randomLongBetween(base - TimeUnit.HOURS.toMillis(1), base);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.indices.resolve.ResolveIndexAction.Transpo
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstractionResolver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
@ -68,7 +69,7 @@ public class ResolveIndexTests extends ESTestCase {
};
private Metadata metadata = buildMetadata(dataStreams, indices);
private IndexNameExpressionResolver resolver = new IndexNameExpressionResolver();
private IndexAbstractionResolver resolver = new IndexAbstractionResolver(new IndexNameExpressionResolver());
public void testResolveStarWithDefaultOptions() {
String[] names = new String[] {"*"};
@ -77,7 +78,7 @@ public class ResolveIndexTests extends ESTestCase {
List<ResolvedAlias> aliases = new ArrayList<>();
List<ResolvedDataStream> dataStreams = new ArrayList<>();
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams);
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams, true);
validateIndices(indices,
"logs-pgsql-prod-20200101",
@ -104,7 +105,7 @@ public class ResolveIndexTests extends ESTestCase {
List<ResolvedAlias> aliases = new ArrayList<>();
List<ResolvedDataStream> dataStreams = new ArrayList<>();
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams);
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams, true);
validateIndices(indices,
".ds-logs-mysql-prod-000001",
".ds-logs-mysql-prod-000002",
@ -136,7 +137,7 @@ public class ResolveIndexTests extends ESTestCase {
List<ResolvedAlias> aliases = new ArrayList<>();
List<ResolvedDataStream> dataStreams = new ArrayList<>();
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams);
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams, true);
validateIndices(indices,
"logs-pgsql-prod-20200101",
@ -160,7 +161,7 @@ public class ResolveIndexTests extends ESTestCase {
List<ResolvedAlias> aliases = new ArrayList<>();
List<ResolvedDataStream> dataStreams = new ArrayList<>();
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams);
TransportAction.resolveIndices(names, indicesOptions, metadata, resolver, indices, aliases, dataStreams, true);
validateIndices(indices, ".ds-logs-mysql-prod-000003", "logs-pgsql-test-20200102");
validateAliases(aliases, "one-off-alias");
validateDataStreams(dataStreams, "logs-mysql-test");

View File

@ -0,0 +1,204 @@
setup:
- skip:
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
indices.put_index_template:
name: my-template1
body:
index_patterns: [simple-data-stream1]
template:
settings:
index.number_of_replicas: 0
data_stream: {}
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [simple-data-stream2] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
indices.put_index_template:
name: my-template2
body:
index_patterns: [simple-data-stream2]
template:
settings:
index.number_of_replicas: 0
data_stream: {}
---
"No data streams":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
- do:
indices.data_streams_stats: {}
- match: { data_stream_count: 0 }
- match: { backing_indices: 0 }
- length: { data_streams: 0 }
---
"Empty data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
- do:
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do:
indices.data_streams_stats: {}
- match: { data_stream_count: 1 }
- match: { backing_indices: 1 }
- length: { data_streams: 1 }
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 1 }
- match: { data_streams.0.maximum_timestamp: 0 }
- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged
---
"Single data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
- do:
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do:
index:
index: simple-data-stream1
op_type: "create"
body: { "@timestamp": 1593639273740 }
refresh: true
- do:
indices.data_streams_stats: {}
- match: { data_stream_count: 1 }
- match: { backing_indices: 1 }
- length: { data_streams: 1 }
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 1 }
- match: { data_streams.0.maximum_timestamp: 1593639273740 }
- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged
---
"Rolled over data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
- do:
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do:
index:
index: simple-data-stream1
op_type: "create"
body: { "@timestamp": 1593639330113 }
refresh: true
- do:
indices.rollover:
alias: simple-data-stream1
wait_for_active_shards: 1
- match: { rolled_over: true }
- do:
index:
index: simple-data-stream1
op_type: "create"
body: { "@timestamp": 1593639345064 }
refresh: true
- do:
indices.data_streams_stats: {}
- match: { data_stream_count: 1 }
- match: { backing_indices: 2 }
- length: { data_streams: 1 }
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 2 }
- match: { data_streams.0.maximum_timestamp: 1593639345064 }
- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged
---
"Multiple data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
- do:
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do:
indices.create_data_stream:
name: simple-data-stream2
- is_true: acknowledged
- do:
index:
index: simple-data-stream1
op_type: "create"
body: { "@timestamp": 1593639434853 }
refresh: true
- do:
index:
index: simple-data-stream2
op_type: "create"
body: { "@timestamp": 1593639450943 }
refresh: true
- do:
indices.rollover:
alias: simple-data-stream1
wait_for_active_shards: 1
- match: { rolled_over: true }
- do:
index:
index: simple-data-stream1
op_type: "create"
body: { "@timestamp": 1593639468350 }
refresh: true
- do:
indices.data_streams_stats: {}
- match: { data_stream_count: 2 }
- match: { backing_indices: 3 }
- length: { data_streams: 2 }
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 2 }
- match: { data_streams.0.maximum_timestamp: 1593639468350 }
- match: { data_streams.1.data_stream: 'simple-data-stream2' }
- match: { data_streams.1.backing_indices: 1 }
- match: { data_streams.1.maximum_timestamp: 1593639450943 }
- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do:
indices.delete_data_stream:
name: simple-data-stream2
- is_true: acknowledged

View File

@ -0,0 +1,221 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.datastreams;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.action.admin.indices.datastream.DataStreamsStatsAction;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.junit.After;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static java.lang.Math.max;
public class DataStreamsStatsTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(DataStreamsPlugin.class);
}
private String timestampFieldName = "@timestamp";
private final Set<String> createdDataStreams = new HashSet<>();
@Override
@After
public void tearDown() throws Exception {
if (createdDataStreams.isEmpty() == false) {
for (String createdDataStream : createdDataStreams) {
deleteDataStream(createdDataStream);
}
createdDataStreams.clear();
}
super.tearDown();
}
public void testStatsNoDataStream() throws Exception {
DataStreamsStatsAction.Response stats = getDataStreamsStats();
assertEquals(0, stats.getSuccessfulShards());
assertEquals(0, stats.getFailedShards());
assertEquals(0, stats.getDataStreamCount());
assertEquals(0, stats.getBackingIndices());
assertEquals(0L, stats.getTotalStoreSize().getBytes());
assertEquals(0, stats.getDataStreams().length);
}
public void testStatsEmptyDataStream() throws Exception {
String dataStreamName = createDataStream();
DataStreamsStatsAction.Response stats = getDataStreamsStats();
assertEquals(1, stats.getSuccessfulShards());
assertEquals(0, stats.getFailedShards());
assertEquals(1, stats.getDataStreamCount());
assertEquals(1, stats.getBackingIndices());
assertNotEquals(0L, stats.getTotalStoreSize().getBytes());
assertEquals(1, stats.getDataStreams().length);
assertEquals(dataStreamName, stats.getDataStreams()[0].getDataStream());
assertEquals(0L, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
}
public void testStatsExistingDataStream() throws Exception {
String dataStreamName = createDataStream();
long timestamp = createDocument(dataStreamName);
DataStreamsStatsAction.Response stats = getDataStreamsStats();
assertEquals(1, stats.getSuccessfulShards());
assertEquals(0, stats.getFailedShards());
assertEquals(1, stats.getDataStreamCount());
assertEquals(1, stats.getBackingIndices());
assertNotEquals(0L, stats.getTotalStoreSize().getBytes());
assertEquals(1, stats.getDataStreams().length);
assertEquals(dataStreamName, stats.getDataStreams()[0].getDataStream());
assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
}
public void testStatsRolledDataStream() throws Exception {
String dataStreamName = createDataStream();
long timestamp = createDocument(dataStreamName);
assertTrue(client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).get().isAcknowledged());
timestamp = max(timestamp, createDocument(dataStreamName));
DataStreamsStatsAction.Response stats = getDataStreamsStats();
assertEquals(2, stats.getSuccessfulShards());
assertEquals(0, stats.getFailedShards());
assertEquals(1, stats.getDataStreamCount());
assertEquals(2, stats.getBackingIndices());
assertNotEquals(0L, stats.getTotalStoreSize().getBytes());
assertEquals(1, stats.getDataStreams().length);
assertEquals(dataStreamName, stats.getDataStreams()[0].getDataStream());
assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
}
public void testStatsMultipleDataStreams() throws Exception {
for (int dataStreamCount = 0; dataStreamCount < (2 + randomInt(3)); dataStreamCount++) {
createDataStream();
}
// Create a number of documents in each data stream
Map<String, Long> maxTimestamps = new HashMap<>();
for (String createdDataStream : createdDataStreams) {
for (int documentCount = 0; documentCount < (1 + randomInt(10)); documentCount++) {
long ts = createDocument(createdDataStream);
long maxTS = max(maxTimestamps.getOrDefault(createdDataStream, 0L), ts);
maxTimestamps.put(createdDataStream, maxTS);
}
}
DataStreamsStatsAction.Response stats = getDataStreamsStats();
logger.error(stats.toString());
assertEquals(createdDataStreams.size(), stats.getSuccessfulShards());
assertEquals(0, stats.getFailedShards());
assertEquals(createdDataStreams.size(), stats.getDataStreamCount());
assertEquals(createdDataStreams.size(), stats.getBackingIndices());
assertNotEquals(0L, stats.getTotalStoreSize().getBytes());
assertEquals(createdDataStreams.size(), stats.getDataStreams().length);
for (DataStreamsStatsAction.DataStreamStats dataStreamStats : stats.getDataStreams()) {
Long expectedMaxTS = maxTimestamps.get(dataStreamStats.getDataStream());
assertNotNull("All indices should have max timestamps", expectedMaxTS);
assertEquals(1, dataStreamStats.getBackingIndices());
assertEquals(expectedMaxTS.longValue(), dataStreamStats.getMaximumTimestamp());
assertNotEquals(0L, dataStreamStats.getStoreSize().getBytes());
}
}
private String createDataStream() throws Exception {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
Template idxTemplate = new Template(
null,
new CompressedXContent("{\"properties\":{\"" + timestampFieldName + "\":{\"type\":\"date\"},\"data\":{\"type\":\"keyword\"}}}"),
null
);
ComposableIndexTemplate template = new ComposableIndexTemplate(
Collections.singletonList(dataStreamName + "*"),
idxTemplate,
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate()
);
assertTrue(
client().execute(
PutComposableIndexTemplateAction.INSTANCE,
new PutComposableIndexTemplateAction.Request(dataStreamName + "_template").indexTemplate(template)
).actionGet().isAcknowledged()
);
assertTrue(
client().execute(CreateDataStreamAction.INSTANCE, new CreateDataStreamAction.Request(dataStreamName)).get().isAcknowledged()
);
createdDataStreams.add(dataStreamName);
return dataStreamName;
}
private long createDocument(String dataStreamName) throws Exception {
// Get some randomized but reasonable timestamps on the data since not all of it is guaranteed to arrive in order.
long timeSeed = System.currentTimeMillis();
long timestamp = randomLongBetween(timeSeed - TimeUnit.HOURS.toMillis(5), timeSeed);
client().index(
new IndexRequest(dataStreamName).opType(DocWriteRequest.OpType.CREATE)
.source(
JsonXContent.contentBuilder()
.startObject()
.field(timestampFieldName, timestamp)
.field("data", randomAlphaOfLength(25))
.endObject()
)
).get();
client().admin()
.indices()
.refresh(new RefreshRequest(".ds-" + dataStreamName + "*").indicesOptions(IndicesOptions.lenientExpandOpenHidden()))
.get();
return timestamp;
}
private DataStreamsStatsAction.Response getDataStreamsStats() throws Exception {
return client().execute(DataStreamsStatsAction.INSTANCE, new DataStreamsStatsAction.Request()).get();
}
private void deleteDataStream(String dataStreamName) throws InterruptedException, java.util.concurrent.ExecutionException {
assertTrue(
client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { dataStreamName }))
.get()
.isAcknowledged()
);
assertTrue(
client().execute(
DeleteComposableIndexTemplateAction.INSTANCE,
new DeleteComposableIndexTemplateAction.Request(dataStreamName + "_template")
).actionGet().isAcknowledged()
);
}
}

View File

@ -17,8 +17,8 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexAbstractionResolver;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata.State;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
@ -54,10 +54,12 @@ class IndicesAndAliasesResolver {
static final List<String> NO_INDICES_OR_ALIASES_LIST = Arrays.asList(NO_INDICES_OR_ALIASES_ARRAY);
private final IndexNameExpressionResolver nameExpressionResolver;
private final IndexAbstractionResolver indexAbstractionResolver;
private final RemoteClusterResolver remoteClusterResolver;
IndicesAndAliasesResolver(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver) {
this.nameExpressionResolver = resolver;
this.indexAbstractionResolver = new IndexAbstractionResolver(resolver);
this.remoteClusterResolver = new RemoteClusterResolver(settings, clusterService.getClusterSettings());
}
@ -137,7 +139,8 @@ class IndicesAndAliasesResolver {
if (IndexNameExpressionResolver.isAllIndices(indicesList(indicesRequest.indices()))) {
if (replaceWildcards) {
for (String authorizedIndex : authorizedIndices) {
if (isIndexVisible("*", authorizedIndex, indicesOptions, metadata, indicesRequest.includeDataStreams())) {
if (IndexAbstractionResolver.isIndexVisible("*", authorizedIndex, indicesOptions, metadata,
indicesRequest.includeDataStreams())) {
resolvedIndicesBuilder.addLocal(authorizedIndex);
}
}
@ -151,7 +154,7 @@ class IndicesAndAliasesResolver {
} else {
split = new ResolvedIndices(Arrays.asList(indicesRequest.indices()), Collections.emptyList());
}
List<String> replaced = replaceWildcardsWithAuthorizedIndices(split.getLocal(), indicesOptions, metadata,
List<String> replaced = indexAbstractionResolver.resolveIndexAbstractions(split.getLocal(), indicesOptions, metadata,
authorizedIndices, replaceWildcards, indicesRequest.includeDataStreams());
if (indicesOptions.ignoreUnavailable()) {
//out of all the explicit names (expanded from wildcards and original ones that were left untouched)
@ -342,135 +345,6 @@ class IndicesAndAliasesResolver {
return false;
}
//TODO Investigate reusing code from vanilla es to resolve index names and wildcards
private List<String> replaceWildcardsWithAuthorizedIndices(Iterable<String> indices, IndicesOptions indicesOptions, Metadata metadata,
List<String> authorizedIndices, boolean replaceWildcards,
boolean includeDataStreams) {
//the order matters when it comes to exclusions
List<String> finalIndices = new ArrayList<>();
boolean wildcardSeen = false;
for (String index : indices) {
String aliasOrIndex;
boolean minus = false;
if (index.charAt(0) == '-' && wildcardSeen) {
aliasOrIndex = index.substring(1);
minus = true;
} else {
aliasOrIndex = index;
}
// we always need to check for date math expressions
final String dateMathName = nameExpressionResolver.resolveDateMathExpression(aliasOrIndex);
if (dateMathName != aliasOrIndex) {
assert dateMathName.equals(aliasOrIndex) == false;
if (replaceWildcards && Regex.isSimpleMatchPattern(dateMathName)) {
// continue
aliasOrIndex = dateMathName;
} else if (authorizedIndices.contains(dateMathName) &&
isIndexVisible(aliasOrIndex, dateMathName, indicesOptions, metadata, includeDataStreams, true)) {
if (minus) {
finalIndices.remove(dateMathName);
} else {
finalIndices.add(dateMathName);
}
} else {
if (indicesOptions.ignoreUnavailable() == false) {
throw new IndexNotFoundException(dateMathName);
}
}
}
if (replaceWildcards && Regex.isSimpleMatchPattern(aliasOrIndex)) {
wildcardSeen = true;
Set<String> resolvedIndices = new HashSet<>();
for (String authorizedIndex : authorizedIndices) {
if (Regex.simpleMatch(aliasOrIndex, authorizedIndex) &&
isIndexVisible(aliasOrIndex, authorizedIndex, indicesOptions, metadata, includeDataStreams)) {
resolvedIndices.add(authorizedIndex);
}
}
if (resolvedIndices.isEmpty()) {
//es core honours allow_no_indices for each wildcard expression, we do the same here by throwing index not found.
if (indicesOptions.allowNoIndices() == false) {
throw new IndexNotFoundException(aliasOrIndex);
}
} else {
if (minus) {
finalIndices.removeAll(resolvedIndices);
} else {
finalIndices.addAll(resolvedIndices);
}
}
} else if (dateMathName == aliasOrIndex) {
// we can use == here to compare strings since the name expression resolver returns the same instance, but add an assert
// to ensure we catch this if it changes
assert dateMathName.equals(aliasOrIndex);
//Metadata#convertFromWildcards checks if the index exists here and throws IndexNotFoundException if not (based on
// ignore_unavailable). We only add/remove the index: if the index is missing or the current user is not authorized
// to access it either an AuthorizationException will be thrown later in AuthorizationService, or the index will be
// removed from the list, based on the ignore_unavailable option.
if (minus) {
finalIndices.remove(aliasOrIndex);
} else {
finalIndices.add(aliasOrIndex);
}
}
}
return finalIndices;
}
private static boolean isIndexVisible(String expression, String index, IndicesOptions indicesOptions, Metadata metadata,
boolean includeDataStreams) {
return isIndexVisible(expression, index, indicesOptions, metadata, includeDataStreams, false);
}
private static boolean isIndexVisible(String expression, String index, IndicesOptions indicesOptions, Metadata metadata,
boolean includeDataStreams, boolean dateMathExpression) {
IndexAbstraction indexAbstraction = metadata.getIndicesLookup().get(index);
if (indexAbstraction == null) {
throw new IllegalStateException("could not resolve index abstraction [" + index + "]");
}
final boolean isHidden = indexAbstraction.isHidden();
if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS) {
//it's an alias, ignore expandWildcardsOpen and expandWildcardsClosed.
//complicated to support those options with aliases pointing to multiple indices...
//TODO investigate supporting expandWildcards option for aliases too, like es core does.
if (indicesOptions.ignoreAliases()) {
return false;
} else if (isHidden == false || indicesOptions.expandWildcardsHidden() || isVisibleDueToImplicitHidden(expression, index)) {
return true;
} else {
return false;
}
}
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
return includeDataStreams;
}
assert indexAbstraction.getIndices().size() == 1 : "concrete index must point to a single index";
IndexMetadata indexMetadata = indexAbstraction.getIndices().get(0);
if (isHidden && indicesOptions.expandWildcardsHidden() == false && isVisibleDueToImplicitHidden(expression, index) == false) {
return false;
}
// the index is not hidden and since it is a date math expression, we consider it visible regardless of open/closed
if (dateMathExpression) {
assert State.values().length == 2 : "a new IndexMetadata.State value may need to be handled!";
return true;
}
if (indexMetadata.getState() == IndexMetadata.State.CLOSE && indicesOptions.expandWildcardsClosed()) {
return true;
}
if (indexMetadata.getState() == IndexMetadata.State.OPEN && indicesOptions.expandWildcardsOpen()) {
return true;
}
return false;
}
private static boolean isVisibleDueToImplicitHidden(String expression, String index) {
return index.startsWith(".") && expression.startsWith(".") && Regex.isSimpleMatchPattern(expression);
}
private static List<String> indicesList(String[] list) {
return (list == null) ? null : Arrays.asList(list);
}

View File

@ -15,7 +15,7 @@ setup:
body: >
{
"indices": [
{ "names": ["simple*"], "privileges": ["read", "write", "create_index", "view_index_metadata", "delete_index"] }
{ "names": ["simple*"], "privileges": ["read", "write", "create_index", "view_index_metadata", "monitor", "delete_index"] }
]
}
@ -25,7 +25,7 @@ setup:
body: >
{
"indices": [
{ "names": ["matches_none"], "privileges": ["read", "write", "create_index", "view_index_metadata", "delete_index"] }
{ "names": ["matches_none"], "privileges": ["read", "write", "create_index", "view_index_metadata", "monitor", "delete_index"] }
]
}
@ -411,6 +411,61 @@ teardown:
name: s-outside-of-authed-namespace
- is_true: acknowledged
---
"Test that data streams stats is limited to authorized namespace":
- skip:
version: " - 7.99.99"
reason: "change to 7.8.99 after backport"
- do: # superuser
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do: # superuser
indices.create_data_stream:
name: s-outside-of-authed-namespace
- is_true: acknowledged
- do: # superuser
indices.data_streams_stats: {}
- match: { data_stream_count: 2 }
- match: { backing_indices: 2 }
- length: { data_streams: 2 }
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 1 }
- match: { data_streams.0.maximum_timestamp: 0 }
- match: { data_streams.1.data_stream: 's-outside-of-authed-namespace' }
- match: { data_streams.1.backing_indices: 1 }
- match: { data_streams.1.maximum_timestamp: 0 }
- do:
headers: { Authorization: "Basic dGVzdF91c2VyOngtcGFjay10ZXN0LXBhc3N3b3Jk" } # test_user
indices.data_streams_stats: {}
- match: { data_stream_count: 1 }
- match: { backing_indices: 1 }
- length: { data_streams: 1 }
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 1 }
- match: { data_streams.0.maximum_timestamp: 0 }
- do:
headers: { Authorization: "Basic bm9fYXV0aHpfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" } # no_authz_user
indices.data_streams_stats: {}
- match: { data_stream_count: 0 }
- match: { backing_indices: 0 }
- length: { data_streams: 0 }
- do: # superuser
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do: # superuser
indices.delete_data_stream:
name: s-outside-of-authed-namespace
- is_true: acknowledged
---
"auto_configure privilege permits auto-create of data streams":
- skip: