* GET data stream API returns additional information (#59128) This adds the data stream's index template, the configured ILM policy (if any) and the health status of the data stream to the GET _data_stream response. Restoring a data stream from a snapshot could install a data stream that doesn't match any composable templates. This also makes the `template` field in the `GET _data_stream` response optional. (cherry picked from commit 0d9c98a82353b088c782b6a04c44844e66137054) Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
This commit is contained in:
parent
6ed356ffc3
commit
24c6a30e2b
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.client.indices;
|
||||
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -34,12 +36,21 @@ public final class DataStream {
|
|||
private final String timeStampField;
|
||||
private final List<String> indices;
|
||||
private final long generation;
|
||||
ClusterHealthStatus dataStreamStatus;
|
||||
@Nullable
|
||||
String indexTemplate;
|
||||
@Nullable
|
||||
String ilmPolicyName;
|
||||
|
||||
public DataStream(String name, String timeStampField, List<String> indices, long generation) {
|
||||
public DataStream(String name, String timeStampField, List<String> indices, long generation, ClusterHealthStatus dataStreamStatus,
|
||||
@Nullable String indexTemplate, @Nullable String ilmPolicyName) {
|
||||
this.name = name;
|
||||
this.timeStampField = timeStampField;
|
||||
this.indices = indices;
|
||||
this.generation = generation;
|
||||
this.dataStreamStatus = dataStreamStatus;
|
||||
this.indexTemplate = indexTemplate;
|
||||
this.ilmPolicyName = ilmPolicyName;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
|
@ -58,18 +69,39 @@ public final class DataStream {
|
|||
return generation;
|
||||
}
|
||||
|
||||
public ClusterHealthStatus getDataStreamStatus() {
|
||||
return dataStreamStatus;
|
||||
}
|
||||
|
||||
public String getIndexTemplate() {
|
||||
return indexTemplate;
|
||||
}
|
||||
|
||||
public String getIlmPolicyName() {
|
||||
return ilmPolicyName;
|
||||
}
|
||||
|
||||
public static final ParseField NAME_FIELD = new ParseField("name");
|
||||
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
|
||||
public static final ParseField INDICES_FIELD = new ParseField("indices");
|
||||
public static final ParseField GENERATION_FIELD = new ParseField("generation");
|
||||
public static final ParseField STATUS_FIELD = new ParseField("status");
|
||||
public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
|
||||
public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
|
||||
args -> {
|
||||
String dataStreamName = (String) args[0];
|
||||
String timeStampField = (String) ((Map<?, ?>) args[1]).get("name");
|
||||
List<String> indices =
|
||||
((List<Map<String, String>>) args[2]).stream().map(m -> m.get("index_name")).collect(Collectors.toList());
|
||||
return new DataStream((String) args[0], timeStampField, indices, (Long) args[3]);
|
||||
Long generation = (Long) args[3];
|
||||
String statusStr = (String) args[4];
|
||||
ClusterHealthStatus status = ClusterHealthStatus.fromString(statusStr);
|
||||
String indexTemplate = (String) args[5];
|
||||
String ilmPolicy = (String) args[6];
|
||||
return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy);
|
||||
});
|
||||
|
||||
static {
|
||||
|
@ -77,6 +109,9 @@ public final class DataStream {
|
|||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.map(), TIMESTAMP_FIELD_FIELD);
|
||||
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), INDICES_FIELD);
|
||||
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), INDEX_TEMPLATE_FIELD);
|
||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD);
|
||||
}
|
||||
|
||||
public static DataStream fromXContent(XContentParser parser) throws IOException {
|
||||
|
@ -88,14 +123,17 @@ public final class DataStream {
|
|||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
DataStream that = (DataStream) o;
|
||||
return name.equals(that.name) &&
|
||||
return generation == that.generation &&
|
||||
name.equals(that.name) &&
|
||||
timeStampField.equals(that.timeStampField) &&
|
||||
indices.equals(that.indices) &&
|
||||
generation == that.generation;
|
||||
dataStreamStatus == that.dataStreamStatus &&
|
||||
Objects.equals(indexTemplate, that.indexTemplate) &&
|
||||
Objects.equals(ilmPolicyName, that.ilmPolicyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(name, timeStampField, indices, generation);
|
||||
return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,9 +42,11 @@ public class GetDataStreamResponse {
|
|||
|
||||
public static GetDataStreamResponse fromXContent(XContentParser parser) throws IOException {
|
||||
final List<DataStream> templates = new ArrayList<>();
|
||||
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_ARRAY; token = parser.nextToken()) {
|
||||
if (token == XContentParser.Token.START_OBJECT) {
|
||||
templates.add(DataStream.fromXContent(parser));
|
||||
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
|
||||
if (token == XContentParser.Token.START_ARRAY) {
|
||||
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
|
||||
templates.add(DataStream.fromXContent(parser));
|
||||
}
|
||||
}
|
||||
}
|
||||
return new GetDataStreamResponse(templates);
|
||||
|
|
|
@ -20,7 +20,9 @@
|
|||
package org.elasticsearch.client.indices;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Response.DataStreamInfo;
|
||||
import org.elasticsearch.client.AbstractResponseTestCase;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
@ -48,17 +50,19 @@ public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetData
|
|||
return indices;
|
||||
}
|
||||
|
||||
private static DataStream randomInstance() {
|
||||
private static DataStreamInfo randomInstance() {
|
||||
List<Index> indices = randomIndexInstances();
|
||||
long generation = indices.size() + randomLongBetween(1, 128);
|
||||
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
indices.add(new Index(getDefaultBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
|
||||
return new DataStream(dataStreamName, createTimestampField(randomAlphaOfLength(10)), indices, generation);
|
||||
DataStream dataStream = new DataStream(dataStreamName, createTimestampField(randomAlphaOfLength(10)), indices, generation);
|
||||
return new DataStreamInfo(dataStream, ClusterHealthStatus.YELLOW, randomAlphaOfLengthBetween(2, 10),
|
||||
randomAlphaOfLengthBetween(2, 10));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected GetDataStreamAction.Response createServerTestInstance(XContentType xContentType) {
|
||||
ArrayList<DataStream> dataStreams = new ArrayList<>();
|
||||
ArrayList<DataStreamInfo> dataStreams = new ArrayList<>();
|
||||
int count = randomInt(10);
|
||||
for (int i = 0; i < count; i++) {
|
||||
dataStreams.add(randomInstance());
|
||||
|
@ -74,12 +78,12 @@ public class GetDataStreamResponseTests extends AbstractResponseTestCase<GetData
|
|||
@Override
|
||||
protected void assertInstances(GetDataStreamAction.Response serverTestInstance, GetDataStreamResponse clientInstance) {
|
||||
assertEquals(serverTestInstance.getDataStreams().size(), clientInstance.getDataStreams().size());
|
||||
Iterator<DataStream> serverIt = serverTestInstance.getDataStreams().iterator();
|
||||
Iterator<DataStreamInfo> serverIt = serverTestInstance.getDataStreams().iterator();
|
||||
|
||||
Iterator<org.elasticsearch.client.indices.DataStream> clientIt = clientInstance.getDataStreams().iterator();
|
||||
while (serverIt.hasNext()) {
|
||||
org.elasticsearch.client.indices.DataStream client = clientIt.next();
|
||||
DataStream server = serverIt.next();
|
||||
DataStream server = serverIt.next().getDataStream();
|
||||
assertEquals(server.getName(), client.getName());
|
||||
assertEquals(server.getIndices().stream().map(Index::getName).collect(Collectors.toList()), client.getIndices());
|
||||
assertEquals(server.getTimeStampField().getName(), client.getTimeStampField());
|
||||
|
|
|
@ -13,6 +13,8 @@ setup:
|
|||
properties:
|
||||
'@timestamp':
|
||||
type: date
|
||||
settings:
|
||||
index.number_of_replicas: 0
|
||||
data_stream:
|
||||
timestamp_field: '@timestamp'
|
||||
- do:
|
||||
|
@ -49,16 +51,19 @@ setup:
|
|||
- do:
|
||||
indices.get_data_stream:
|
||||
name: "*"
|
||||
- match: { 0.name: simple-data-stream1 }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.generation: 1 }
|
||||
- length: { 0.indices: 1 }
|
||||
- match: { 0.indices.0.index_name: '.ds-simple-data-stream1-000001' }
|
||||
- match: { 1.name: simple-data-stream2 }
|
||||
- match: { 1.timestamp_field.name: '@timestamp2' }
|
||||
- match: { 0.generation: 1 }
|
||||
- length: { 1.indices: 1 }
|
||||
- match: { 1.indices.0.index_name: '.ds-simple-data-stream2-000001' }
|
||||
- match: { data_streams.0.name: simple-data-stream1 }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
- length: { data_streams.0.indices: 1 }
|
||||
- match: { data_streams.0.indices.0.index_name: '.ds-simple-data-stream1-000001' }
|
||||
- match: { data_streams.0.status: 'GREEN' }
|
||||
- match: { data_streams.0.template: 'my-template1' }
|
||||
- match: { data_streams.1.name: simple-data-stream2 }
|
||||
- match: { data_streams.1.timestamp_field.name: '@timestamp2' }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
- length: { data_streams.1.indices: 1 }
|
||||
- match: { data_streams.1.indices.0.index_name: '.ds-simple-data-stream2-000001' }
|
||||
- match: { data_streams.1.template: 'my-template2' }
|
||||
|
||||
- do:
|
||||
index:
|
||||
|
@ -122,34 +127,34 @@ setup:
|
|||
|
||||
- do:
|
||||
indices.get_data_stream: {}
|
||||
- match: { 0.name: simple-data-stream1 }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.timestamp_field.mapping: {type: date} }
|
||||
- match: { 0.generation: 1 }
|
||||
- match: { 1.name: simple-data-stream2 }
|
||||
- match: { 1.timestamp_field.name: '@timestamp2' }
|
||||
- match: { 1.timestamp_field.mapping: {type: date} }
|
||||
- match: { 1.generation: 1 }
|
||||
- match: { data_streams.0.name: simple-data-stream1 }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.timestamp_field.mapping: {type: date} }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
- match: { data_streams.1.name: simple-data-stream2 }
|
||||
- match: { data_streams.1.timestamp_field.name: '@timestamp2' }
|
||||
- match: { data_streams.1.timestamp_field.mapping: {type: date} }
|
||||
- match: { data_streams.1.generation: 1 }
|
||||
|
||||
- do:
|
||||
indices.get_data_stream:
|
||||
name: simple-data-stream1
|
||||
- match: { 0.name: simple-data-stream1 }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.timestamp_field.mapping: {type: date} }
|
||||
- match: { 0.generation: 1 }
|
||||
- match: { data_streams.0.name: simple-data-stream1 }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.timestamp_field.mapping: {type: date} }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
|
||||
- do:
|
||||
indices.get_data_stream:
|
||||
name: simple-data-stream*
|
||||
- match: { 0.name: simple-data-stream1 }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.timestamp_field.mapping: {type: date} }
|
||||
- match: { 0.generation: 1 }
|
||||
- match: { 1.name: simple-data-stream2 }
|
||||
- match: { 1.timestamp_field.name: '@timestamp2' }
|
||||
- match: { 1.timestamp_field.mapping: {type: date} }
|
||||
- match: { 1.generation: 1 }
|
||||
- match: { data_streams.0.name: simple-data-stream1 }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.timestamp_field.mapping: {type: date} }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
- match: { data_streams.1.name: simple-data-stream2 }
|
||||
- match: { data_streams.1.timestamp_field.name: '@timestamp2' }
|
||||
- match: { data_streams.1.timestamp_field.mapping: {type: date} }
|
||||
- match: { data_streams.1.generation: 1 }
|
||||
|
||||
- do:
|
||||
indices.get_data_stream:
|
||||
|
@ -162,7 +167,7 @@ setup:
|
|||
- do:
|
||||
indices.get_data_stream:
|
||||
name: nonexistent*
|
||||
- match: { $body: [] }
|
||||
- match: { data_streams: [] }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
|
@ -202,11 +207,11 @@ setup:
|
|||
|
||||
- do:
|
||||
indices.get_data_stream: {}
|
||||
- match: { 0.name: simple-data-stream1 }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.generation: 1 }
|
||||
- length: { 0.indices: 1 }
|
||||
- match: { 0.indices.0.index_name: '.ds-simple-data-stream1-000001' }
|
||||
- match: { data_streams.0.name: simple-data-stream1 }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
- length: { data_streams.0.indices: 1 }
|
||||
- match: { data_streams.0.indices.0.index_name: '.ds-simple-data-stream1-000001' }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
|
|
|
@ -42,10 +42,10 @@
|
|||
- do:
|
||||
indices.get_data_stream:
|
||||
name: logs-foobar
|
||||
- match: { 0.name: logs-foobar }
|
||||
- match: { 0.timestamp_field.name: 'timestamp' }
|
||||
- length: { 0.indices: 1 }
|
||||
- match: { 0.indices.0.index_name: '.ds-logs-foobar-000001' }
|
||||
- match: { data_streams.0.name: logs-foobar }
|
||||
- match: { data_streams.0.timestamp_field.name: 'timestamp' }
|
||||
- length: { data_streams.0.indices: 1 }
|
||||
- match: { data_streams.0.indices.0.index_name: '.ds-logs-foobar-000001' }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
|
|
|
@ -57,11 +57,11 @@ setup:
|
|||
- do:
|
||||
indices.get_data_stream:
|
||||
name: "*"
|
||||
- match: { 0.name: simple-data-stream }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.generation: 2 }
|
||||
- length: { 0.indices: 1 }
|
||||
- match: { 0.indices.0.index_name: '.ds-simple-data-stream-000002' }
|
||||
- match: { data_streams.0.name: simple-data-stream }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.generation: 2 }
|
||||
- length: { data_streams.0.indices: 1 }
|
||||
- match: { data_streams.0.indices.0.index_name: '.ds-simple-data-stream-000002' }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
|
|
|
@ -45,12 +45,12 @@
|
|||
- do:
|
||||
indices.get_data_stream:
|
||||
name: "*"
|
||||
- match: { 0.name: data-stream-for-rollover }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.generation: 2 }
|
||||
- length: { 0.indices: 2 }
|
||||
- match: { 0.indices.0.index_name: '.ds-data-stream-for-rollover-000001' }
|
||||
- match: { 0.indices.1.index_name: '.ds-data-stream-for-rollover-000002' }
|
||||
- match: { data_streams.0.name: data-stream-for-rollover }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.generation: 2 }
|
||||
- length: { data_streams.0.indices: 2 }
|
||||
- match: { data_streams.0.indices.0.index_name: '.ds-data-stream-for-rollover-000001' }
|
||||
- match: { data_streams.0.indices.1.index_name: '.ds-data-stream-for-rollover-000002' }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
|
|
|
@ -30,16 +30,15 @@ import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
|||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.ingest.PutPipelineRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Template;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
|
@ -66,8 +65,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import static org.elasticsearch.action.DocWriteRequest.OpType.CREATE;
|
||||
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
|
||||
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamServiceTests.generateMapping;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
|
@ -262,11 +261,11 @@ public class BulkIntegrationIT extends ESIntegTestCase {
|
|||
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
|
||||
GetDataStreamAction.Response getDataStreamsResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
|
||||
assertThat(getDataStreamsResponse.getDataStreams(), hasSize(4));
|
||||
getDataStreamsResponse.getDataStreams().sort(Comparator.comparing(DataStream::getName));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(1).getName(), equalTo("logs-foobaz"));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(2).getName(), equalTo("logs-foobaz2"));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(3).getName(), equalTo("logs-foobaz3"));
|
||||
getDataStreamsResponse.getDataStreams().sort(Comparator.comparing(dataStreamInfo -> dataStreamInfo.getDataStream().getName()));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(0).getDataStream().getName(), equalTo("logs-foobar"));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(1).getDataStream().getName(), equalTo("logs-foobaz"));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(2).getDataStream().getName(), equalTo("logs-foobaz2"));
|
||||
assertThat(getDataStreamsResponse.getDataStreams().get(3).getDataStream().getName(), equalTo("logs-foobaz3"));
|
||||
|
||||
GetIndexResponse getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices("logs-bar*")).actionGet();
|
||||
assertThat(getIndexResponse.getIndices(), arrayWithSize(4));
|
||||
|
|
|
@ -47,10 +47,13 @@ import org.elasticsearch.action.search.SearchRequest;
|
|||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.MetadataCreateDataStreamServiceTests;
|
||||
import org.elasticsearch.cluster.metadata.Template;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.List;
|
||||
import org.elasticsearch.common.collect.Map;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
|
@ -118,19 +121,21 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
|
||||
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
|
||||
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
|
||||
getDataStreamResponse.getDataStreams().sort(Comparator.comparing(DataStream::getName));
|
||||
getDataStreamResponse.getDataStreams().sort(Comparator.comparing(dataStreamInfo -> dataStreamInfo.getDataStream().getName()));
|
||||
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(2));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo("metrics-bar"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("@timestamp2"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().get(0).getName(),
|
||||
DataStream firstDataStream = getDataStreamResponse.getDataStreams().get(0).getDataStream();
|
||||
assertThat(firstDataStream.getName(), equalTo("metrics-bar"));
|
||||
assertThat(firstDataStream.getTimeStampField().getName(), equalTo("@timestamp2"));
|
||||
assertThat(firstDataStream.getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
|
||||
assertThat(firstDataStream.getIndices().size(), equalTo(1));
|
||||
assertThat(firstDataStream.getIndices().get(0).getName(),
|
||||
equalTo(DataStream.getDefaultBackingIndexName("metrics-bar", 1)));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(1).getName(), equalTo("metrics-foo"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(1).getTimeStampField().getName(), equalTo("@timestamp1"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(1).getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(1).getIndices().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(1).getIndices().get(0).getName(),
|
||||
DataStream dataStream = getDataStreamResponse.getDataStreams().get(1).getDataStream();
|
||||
assertThat(dataStream.getName(), equalTo("metrics-foo"));
|
||||
assertThat(dataStream.getTimeStampField().getName(), equalTo("@timestamp1"));
|
||||
assertThat(dataStream.getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
|
||||
assertThat(dataStream.getIndices().size(), equalTo(1));
|
||||
assertThat(dataStream.getIndices().get(0).getName(),
|
||||
equalTo(DataStream.getDefaultBackingIndexName("metrics-foo", 1)));
|
||||
|
||||
String backingIndex = DataStream.getDefaultBackingIndexName("metrics-bar", 1);
|
||||
|
@ -302,10 +307,10 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
|
||||
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
|
||||
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo(dataStreamName));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("@timestamp"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().get(0).getName(), equalTo(backingIndex));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo(dataStreamName));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getTimeStampField().getName(), equalTo("@timestamp"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), equalTo(backingIndex));
|
||||
|
||||
GetIndexResponse getIndexResponse =
|
||||
client().admin().indices().getIndex(new GetIndexRequest().indices(dataStreamName)).actionGet();
|
||||
|
@ -561,16 +566,18 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
" }\n" +
|
||||
" }\n" +
|
||||
" }";;
|
||||
putComposableIndexTemplate("id1", "event.@timestamp", mapping, List.of("logs-foo*"));
|
||||
putComposableIndexTemplate("id1", "event.@timestamp", mapping, List.of("logs-foo*"), null);
|
||||
|
||||
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar");
|
||||
client().admin().indices().createDataStream(createDataStreamRequest).get();
|
||||
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("logs-foobar");
|
||||
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
|
||||
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("event.@timestamp"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getFieldMapping(), equalTo(Map.of("type", "date")));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo("logs-foobar"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getTimeStampField().getName(),
|
||||
equalTo("event.@timestamp"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getTimeStampField().getFieldMapping(),
|
||||
equalTo(Map.of("type", "date")));
|
||||
assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 1), "properties.event.properties.@timestamp");
|
||||
|
||||
// Change the template to have a different timestamp field
|
||||
|
@ -597,18 +604,18 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
putComposableIndexTemplate("id1", "@timestamp", mapping, List.of("logs-foo*"));
|
||||
putComposableIndexTemplate("id1", "@timestamp", mapping, List.of("logs-foo*"), null);
|
||||
|
||||
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("logs-foobar");
|
||||
client().admin().indices().createDataStream(createDataStreamRequest).get();
|
||||
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("logs-foobar");
|
||||
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
|
||||
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo("logs-foobar"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getName(), equalTo("@timestamp"));
|
||||
java.util.Map<?, ?> expectedTimestampMapping =
|
||||
Map.of("type", "date", "format", "yyyy-MM", "meta", Map.of("x", "y"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField().getFieldMapping(), equalTo(expectedTimestampMapping));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getName(), equalTo("logs-foobar"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getTimeStampField().getName(), equalTo("@timestamp"));
|
||||
java.util.Map<?, ?> expectedTimestampMapping = Map.of("type", "date", "format", "yyyy-MM", "meta", Map.of("x", "y"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getDataStream().getTimeStampField().getFieldMapping(),
|
||||
equalTo(expectedTimestampMapping));
|
||||
assertBackingIndex(DataStream.getDefaultBackingIndexName("logs-foobar", 1), "properties.@timestamp", expectedTimestampMapping);
|
||||
|
||||
// Change the template to have a different timestamp field
|
||||
|
@ -756,6 +763,25 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
assertThat(searchResponse.getHits().getTotalHits().value, is((long) numDocsBar + numDocsFoo + numDocsRolledFoo));
|
||||
}
|
||||
|
||||
public void testGetDataStream() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, maximumNumberOfReplicas() + 2)
|
||||
.build();
|
||||
putComposableIndexTemplate("template_for_foo", "@timestamp", List.of("metrics-foo*"), settings);
|
||||
|
||||
int numDocsFoo = randomIntBetween(2, 16);
|
||||
indexDocs("metrics-foo", "@timestamp", numDocsFoo);
|
||||
|
||||
GetDataStreamAction.Response response =
|
||||
client().admin().indices().getDataStreams(new GetDataStreamAction.Request("metrics-foo")).actionGet();
|
||||
assertThat(response.getDataStreams().size(), is(1));
|
||||
GetDataStreamAction.Response.DataStreamInfo metricsFooDataStream = response.getDataStreams().get(0);
|
||||
assertThat(metricsFooDataStream.getDataStream().getName(), is("metrics-foo"));
|
||||
assertThat(metricsFooDataStream.getDataStreamStatus(), is(ClusterHealthStatus.YELLOW));
|
||||
assertThat(metricsFooDataStream.getIndexTemplate(), is("template_for_foo"));
|
||||
assertThat(metricsFooDataStream.getIlmPolicy(), is(nullValue()));
|
||||
}
|
||||
|
||||
private static void assertBackingIndex(String backingIndex, String timestampFieldPathInMapping) {
|
||||
assertBackingIndex(backingIndex, timestampFieldPathInMapping, Map.of("type", "date"));
|
||||
}
|
||||
|
@ -864,28 +890,30 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
});
|
||||
}
|
||||
|
||||
private static void expectFailure(String dataStreamName, ThrowingRunnable runnable) {
|
||||
Exception e = expectThrows(IllegalArgumentException.class, runnable);
|
||||
assertThat(e.getMessage(), equalTo("The provided expression [" + dataStreamName +
|
||||
"] matches a data stream, specify the corresponding concrete indices instead."));
|
||||
public static void putComposableIndexTemplate(String id, String timestampFieldName,
|
||||
java.util.List<String> patterns) throws IOException {
|
||||
String mapping = MetadataCreateDataStreamServiceTests.generateMapping(timestampFieldName);
|
||||
putComposableIndexTemplate(id, timestampFieldName, mapping, patterns, null);
|
||||
}
|
||||
|
||||
public static void putComposableIndexTemplate(String id,
|
||||
String timestampFieldName,
|
||||
java.util.List<String> patterns) throws IOException {
|
||||
java.util.List<String> patterns,
|
||||
Settings settings) throws IOException {
|
||||
String mapping = MetadataCreateDataStreamServiceTests.generateMapping(timestampFieldName);
|
||||
putComposableIndexTemplate(id, timestampFieldName, mapping, patterns);
|
||||
putComposableIndexTemplate(id, timestampFieldName, mapping, patterns, settings);
|
||||
}
|
||||
|
||||
static void putComposableIndexTemplate(String id,
|
||||
String timestampFieldName,
|
||||
String mapping,
|
||||
java.util.List<String> patterns) throws IOException {
|
||||
java.util.List<String> patterns,
|
||||
@Nullable Settings settings) throws IOException {
|
||||
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
|
||||
request.indexTemplate(
|
||||
new ComposableIndexTemplate(
|
||||
patterns,
|
||||
new Template(null, new CompressedXContent(mapping), null),
|
||||
new Template(settings, new CompressedXContent(mapping), null),
|
||||
null, null, null, null,
|
||||
new ComposableIndexTemplate.DataStreamTemplate(timestampFieldName))
|
||||
);
|
||||
|
|
|
@ -46,6 +46,7 @@ import java.util.concurrent.ExecutionException;
|
|||
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
|
||||
|
@ -118,8 +119,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
|
||||
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds")).get();
|
||||
assertEquals(1, ds.getDataStreams().size());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getIndices().size());
|
||||
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
|
||||
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
|
||||
}
|
||||
|
||||
public void testSnapshotAndRestoreAll() throws Exception {
|
||||
|
@ -156,8 +157,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
|
||||
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds")).get();
|
||||
assertEquals(1, ds.getDataStreams().size());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getIndices().size());
|
||||
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
|
||||
assertEquals(DS_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
|
||||
|
||||
assertAcked(client().admin().indices().deleteDataStream(new DeleteDataStreamAction.Request(new String[]{"ds"})).get());
|
||||
}
|
||||
|
@ -189,8 +190,8 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
|
||||
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get();
|
||||
assertEquals(1, ds.getDataStreams().size());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getIndices().size());
|
||||
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
|
||||
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
|
||||
assertEquals(DOCUMENT_SOURCE, client.prepareSearch("ds2").get().getHits().getHits()[0].getSourceAsMap());
|
||||
assertEquals(DOCUMENT_SOURCE, client.prepareGet(DS2_BACKING_INDEX_NAME, "_doc", id).get().getSourceAsMap());
|
||||
}
|
||||
|
@ -228,7 +229,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
|
||||
GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request("ds");
|
||||
GetDataStreamAction.Response response = client.admin().indices().getDataStreams(getDSRequest).actionGet();
|
||||
assertThat(response.getDataStreams().get(0).getIndices().get(0).getName(), is(DS_BACKING_INDEX_NAME));
|
||||
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(DS_BACKING_INDEX_NAME));
|
||||
}
|
||||
|
||||
public void testDataStreamAndBackingIndidcesAreRenamedUsingRegex() {
|
||||
|
@ -262,13 +263,13 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
// assert "ds" was restored as "test-ds" and the backing index has a valid name
|
||||
GetDataStreamAction.Request getRenamedDS = new GetDataStreamAction.Request("test-ds");
|
||||
GetDataStreamAction.Response response = client.admin().indices().getDataStreams(getRenamedDS).actionGet();
|
||||
assertThat(response.getDataStreams().get(0).getIndices().get(0).getName(),
|
||||
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(),
|
||||
is(DataStream.getDefaultBackingIndexName("test-ds", 1L)));
|
||||
|
||||
// data stream "ds" should still exist in the system
|
||||
GetDataStreamAction.Request getDSRequest = new GetDataStreamAction.Request("ds");
|
||||
response = client.admin().indices().getDataStreams(getDSRequest).actionGet();
|
||||
assertThat(response.getDataStreams().get(0).getIndices().get(0).getName(), is(DS_BACKING_INDEX_NAME));
|
||||
assertThat(response.getDataStreams().get(0).getDataStream().getIndices().get(0).getName(), is(DS_BACKING_INDEX_NAME));
|
||||
}
|
||||
|
||||
public void testWildcards() throws Exception {
|
||||
|
@ -294,8 +295,10 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
|
||||
GetDataStreamAction.Response ds = client.admin().indices().getDataStreams(new GetDataStreamAction.Request("ds2")).get();
|
||||
assertEquals(1, ds.getDataStreams().size());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getIndices().size());
|
||||
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getIndices().get(0).getName());
|
||||
assertEquals(1, ds.getDataStreams().get(0).getDataStream().getIndices().size());
|
||||
assertEquals(DS2_BACKING_INDEX_NAME, ds.getDataStreams().get(0).getDataStream().getIndices().get(0).getName());
|
||||
assertThat("we renamed the restored data stream to one that doesn't match any existing composable template",
|
||||
ds.getDataStreams().get(0).getIndexTemplate(), is(nullValue()));
|
||||
}
|
||||
|
||||
public void testDataStreamNotStoredWhenIndexRequested() throws Exception {
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
*/
|
||||
package org.elasticsearch.action.admin.indices.datastream;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
|
@ -26,18 +28,26 @@ import org.elasticsearch.action.ActionType;
|
|||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
|
||||
import org.elasticsearch.cluster.AbstractDiffable;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.health.ClusterStateHealth;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -96,18 +106,103 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
|
|||
}
|
||||
|
||||
public static class Response extends ActionResponse implements ToXContentObject {
|
||||
public static final ParseField DATASTREAMS_FIELD = new ParseField("data_streams");
|
||||
|
||||
private final List<DataStream> dataStreams;
|
||||
public static class DataStreamInfo extends AbstractDiffable<DataStreamInfo> implements ToXContentObject {
|
||||
|
||||
public Response(List<DataStream> dataStreams) {
|
||||
public static final ParseField STATUS_FIELD = new ParseField("status");
|
||||
public static final ParseField INDEX_TEMPLATE_FIELD = new ParseField("template");
|
||||
public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
|
||||
|
||||
DataStream dataStream;
|
||||
ClusterHealthStatus dataStreamStatus;
|
||||
@Nullable String indexTemplate;
|
||||
@Nullable String ilmPolicyName;
|
||||
|
||||
public DataStreamInfo(DataStream dataStream, ClusterHealthStatus dataStreamStatus, @Nullable String indexTemplate,
|
||||
@Nullable String ilmPolicyName) {
|
||||
this.dataStream = dataStream;
|
||||
this.dataStreamStatus = dataStreamStatus;
|
||||
this.indexTemplate = indexTemplate;
|
||||
this.ilmPolicyName = ilmPolicyName;
|
||||
}
|
||||
|
||||
public DataStreamInfo(StreamInput in) throws IOException {
|
||||
this(new DataStream(in), ClusterHealthStatus.readFrom(in), in.readOptionalString(), in.readOptionalString());
|
||||
}
|
||||
|
||||
public DataStream getDataStream() {
|
||||
return dataStream;
|
||||
}
|
||||
|
||||
public ClusterHealthStatus getDataStreamStatus() {
|
||||
return dataStreamStatus;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getIndexTemplate() {
|
||||
return indexTemplate;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String getIlmPolicy() {
|
||||
return ilmPolicyName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
dataStream.writeTo(out);
|
||||
dataStreamStatus.writeTo(out);
|
||||
out.writeOptionalString(indexTemplate);
|
||||
out.writeOptionalString(ilmPolicyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(DataStream.NAME_FIELD.getPreferredName(), dataStream.getName());
|
||||
builder.field(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName(), dataStream.getTimeStampField());
|
||||
builder.field(DataStream.INDICES_FIELD.getPreferredName(), dataStream.getIndices());
|
||||
builder.field(DataStream.GENERATION_FIELD.getPreferredName(), dataStream.getGeneration());
|
||||
builder.field(STATUS_FIELD.getPreferredName(), dataStreamStatus);
|
||||
if (indexTemplate != null) {
|
||||
builder.field(INDEX_TEMPLATE_FIELD.getPreferredName(), indexTemplate);
|
||||
}
|
||||
if (ilmPolicyName != null) {
|
||||
builder.field(ILM_POLICY_FIELD.getPreferredName(), ilmPolicyName);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
DataStreamInfo that = (DataStreamInfo) o;
|
||||
return dataStream.equals(that.dataStream) &&
|
||||
dataStreamStatus == that.dataStreamStatus &&
|
||||
Objects.equals(indexTemplate, that.indexTemplate) &&
|
||||
Objects.equals(ilmPolicyName, that.ilmPolicyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(dataStream, dataStreamStatus, indexTemplate, ilmPolicyName);
|
||||
}
|
||||
}
|
||||
|
||||
private final List<DataStreamInfo> dataStreams;
|
||||
|
||||
public Response(List<DataStreamInfo> dataStreams) {
|
||||
this.dataStreams = dataStreams;
|
||||
}
|
||||
|
||||
public Response(StreamInput in) throws IOException {
|
||||
this(in.readList(DataStream::new));
|
||||
this(in.readList(DataStreamInfo::new));
|
||||
}
|
||||
|
||||
public List<DataStream> getDataStreams() {
|
||||
public List<DataStreamInfo> getDataStreams() {
|
||||
return dataStreams;
|
||||
}
|
||||
|
||||
|
@ -118,11 +213,13 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
|
|||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startArray();
|
||||
for (DataStream dataStream : dataStreams) {
|
||||
builder.startObject();
|
||||
builder.startArray(DATASTREAMS_FIELD.getPreferredName());
|
||||
for (DataStreamInfo dataStream : dataStreams) {
|
||||
dataStream.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
@ -142,6 +239,8 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
|
|||
|
||||
public static class TransportAction extends TransportMasterNodeReadAction<Request, Response> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportAction.class);
|
||||
|
||||
@Inject
|
||||
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
|
@ -161,7 +260,23 @@ public class GetDataStreamAction extends ActionType<GetDataStreamAction.Response
|
|||
@Override
|
||||
protected void masterOperation(Request request, ClusterState state,
|
||||
ActionListener<Response> listener) throws Exception {
|
||||
listener.onResponse(new Response(getDataStreams(state, request)));
|
||||
List<DataStream> dataStreams = getDataStreams(state, request);
|
||||
List<Response.DataStreamInfo> dataStreamInfos = new ArrayList<>(dataStreams.size());
|
||||
for (DataStream dataStream : dataStreams) {
|
||||
String indexTemplate = MetadataIndexTemplateService.findV2Template(state.metadata(), dataStream.getName(), false);
|
||||
String ilmPolicyName = null;
|
||||
if (indexTemplate != null) {
|
||||
Settings settings = MetadataIndexTemplateService.resolveSettings(state.metadata(), indexTemplate);
|
||||
ilmPolicyName = settings.get("index.lifecycle.name");
|
||||
} else {
|
||||
logger.warn("couldn't find any matching template for data stream [{}]. has it been restored (and possibly renamed)" +
|
||||
"from a snapshot?", dataStream.getName());
|
||||
}
|
||||
ClusterStateHealth streamHealth = new ClusterStateHealth(state,
|
||||
dataStream.getIndices().stream().map(Index::getName).toArray(String[]::new));
|
||||
dataStreamInfos.add(new Response.DataStreamInfo(dataStream, streamHealth.getStatus(), indexTemplate, ilmPolicyName));
|
||||
}
|
||||
listener.onResponse(new Response(dataStreamInfos));
|
||||
}
|
||||
|
||||
static List<DataStream> getDataStreams(ClusterState clusterState, Request request) {
|
||||
|
|
|
@ -46,7 +46,6 @@ import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService
|
|||
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
|
||||
|
||||
public static final String BACKING_INDEX_PREFIX = ".ds-";
|
||||
public static final String DATA_STREAMS_METADATA_FIELD = "data-streams";
|
||||
|
||||
private final String name;
|
||||
private final TimestampField timeStampField;
|
||||
|
|
|
@ -19,41 +19,28 @@
|
|||
package org.elasticsearch.action.admin.indices.datastream;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction.Response;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.DataStreamTests;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class GetDataStreamsResponseTests extends AbstractSerializingTestCase<Response> {
|
||||
public class GetDataStreamsResponseTests extends AbstractWireSerializingTestCase<Response> {
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<Response> instanceReader() {
|
||||
return Response::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response doParseInstance(XContentParser parser) throws IOException {
|
||||
List<DataStream> dataStreams = new ArrayList<>();
|
||||
for (Token token = parser.nextToken(); token != Token.END_ARRAY; token = parser.nextToken()) {
|
||||
if (token == Token.START_OBJECT) {
|
||||
dataStreams.add(DataStream.fromXContent(parser));
|
||||
}
|
||||
}
|
||||
return new Response(dataStreams);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response createTestInstance() {
|
||||
int numDataStreams = randomIntBetween(0, 8);
|
||||
List<DataStream> dataStreams = new ArrayList<>();
|
||||
List<Response.DataStreamInfo> dataStreams = new ArrayList<>();
|
||||
for (int i = 0; i < numDataStreams; i++) {
|
||||
dataStreams.add(DataStreamTests.randomInstance());
|
||||
dataStreams.add(new Response.DataStreamInfo(DataStreamTests.randomInstance(), ClusterHealthStatus.GREEN,
|
||||
randomAlphaOfLengthBetween(2, 10), randomAlphaOfLengthBetween(2, 10)));
|
||||
}
|
||||
return new Response(dataStreams);
|
||||
}
|
||||
|
|
|
@ -6,12 +6,16 @@
|
|||
|
||||
package org.elasticsearch.xpack.ilm;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Template;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.xpack.core.ilm.CheckNotDataStreamWriteIndexStep;
|
||||
|
@ -25,6 +29,8 @@ import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
|
|||
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -206,6 +212,26 @@ public class TimeSeriesDataStreamsIT extends ESRestTestCase {
|
|||
TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGetDataStreamReturnsILMPolicy() throws Exception {
|
||||
String policyName = "logs-policy";
|
||||
createComposableTemplate(client(), "logs-template", "logs-foo*", getTemplate(policyName));
|
||||
String dataStream = "logs-foo";
|
||||
indexDocument(client(), dataStream, true);
|
||||
|
||||
Request explainRequest = new Request("GET", "/_data_stream/logs-foo");
|
||||
Response response = client().performRequest(explainRequest);
|
||||
Map<String, Object> responseMap;
|
||||
try (InputStream is = response.getEntity().getContent()) {
|
||||
responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
|
||||
}
|
||||
|
||||
List<Object> dataStreams = (List<Object>) responseMap.get("data_streams");
|
||||
assertThat(dataStreams.size(), is(1));
|
||||
Map<String, Object> logsDataStream = (Map<String, Object>) dataStreams.get(0);
|
||||
assertThat(logsDataStream.get("ilm_policy"), is(policyName));
|
||||
}
|
||||
|
||||
private static Template getTemplate(String policyName) throws IOException {
|
||||
return new Template(getLifcycleSettings(policyName), new CompressedXContent(TIMESTAMP_MAPPING), null);
|
||||
}
|
||||
|
|
|
@ -51,11 +51,11 @@ setup:
|
|||
indices.get_data_stream:
|
||||
name: logs-foo-bar
|
||||
|
||||
- match: { 0.name: logs-foo-bar }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.generation: 1 }
|
||||
- length: { 0.indices: 1 }
|
||||
- match: { 0.indices.0.index_name: '.ds-logs-foo-bar-000001' }
|
||||
- match: { data_streams.0.name: logs-foo-bar }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
- length: { data_streams.0.indices: 1 }
|
||||
- match: { data_streams.0.indices.0.index_name: '.ds-logs-foo-bar-000001' }
|
||||
|
||||
- do:
|
||||
indices.get:
|
||||
|
@ -84,11 +84,11 @@ setup:
|
|||
indices.get_data_stream:
|
||||
name: metrics-foo-bar
|
||||
|
||||
- match: { 0.name: metrics-foo-bar }
|
||||
- match: { 0.timestamp_field.name: '@timestamp' }
|
||||
- match: { 0.generation: 1 }
|
||||
- length: { 0.indices: 1 }
|
||||
- match: { 0.indices.0.index_name: '.ds-metrics-foo-bar-000001' }
|
||||
- match: { data_streams.0.name: metrics-foo-bar }
|
||||
- match: { data_streams.0.timestamp_field.name: '@timestamp' }
|
||||
- match: { data_streams.0.generation: 1 }
|
||||
- length: { data_streams.0.indices: 1 }
|
||||
- match: { data_streams.0.indices.0.index_name: '.ds-metrics-foo-bar-000001' }
|
||||
|
||||
- do:
|
||||
indices.get:
|
||||
|
|
Loading…
Reference in New Issue