[7.x] Cluster state and CRUD operations for data streams (#54073)

This commit is contained in:
Dan Hermann 2020-03-24 07:58:52 -05:00 committed by GitHub
parent 92acb2859b
commit 30105a5ab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 595 additions and 25 deletions

View File

@ -1,26 +1,33 @@
---
"Test stubs":
"Create data stream":
- skip:
version: " - 7.6.99"
reason: only available in 7.7+
version: "all"
reason: "AwaitsFix https://github.com/elastic/elasticsearch/issues/54022"
- do:
indices.create_data_stream:
name: data-stream2
name: simple-data-stream1
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
- do:
indices.create_data_stream:
name: simple-data-stream2
body:
timestamp_field: "@timestamp2"
- is_true: acknowledged
- do:
indices.get_data_streams: {}
- match: { 0.name: my_data_stream1 }
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: ['my_data_stream1-000000'] }
- match: { 1.name: my_data_stream2 }
- match: { 1.timestamp_field: '@timestamp' }
- match: { 0.indices: [] }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.indices: [] }
- do:
indices.delete_data_stream:
name: data-stream2
name: simple-data-stream2
- is_true: acknowledged

View File

@ -18,29 +18,41 @@
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(CreateDataStreamAction.class);
public static final CreateDataStreamAction INSTANCE = new CreateDataStreamAction();
public static final String NAME = "indices:admin/data_stream/create";
@ -63,7 +75,14 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (Strings.hasText(name) == false) {
validationException = ValidateActions.addValidationError("name is missing", validationException);
}
if (Strings.hasText(timestampFieldName) == false) {
validationException = ValidateActions.addValidationError("timestamp field name is missing", validationException);
}
return validationException;
}
public Request(StreamInput in) throws IOException {
@ -115,8 +134,42 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
clusterService.submitStateUpdateTask("create-data-stream [" + request.name + "]",
new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return createDataStream(currentState, request);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}
static ClusterState createDataStream(ClusterState currentState, Request request) {
if (currentState.metaData().dataStreams().containsKey(request.name)) {
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
}
MetaData.Builder builder = MetaData.builder(currentState.metaData()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metaData(builder).build();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {

View File

@ -18,29 +18,43 @@
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
private static final Logger logger = LogManager.getLogger(DeleteDataStreamAction.class);
public static final DeleteDataStreamAction INSTANCE = new DeleteDataStreamAction();
public static final String NAME = "indices:admin/data_stream/delete";
@ -58,7 +72,11 @@ public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;
if (Strings.hasText(name) == false) {
validationException = ValidateActions.addValidationError("name is missing", validationException);
}
return validationException;
}
public Request(StreamInput in) throws IOException {
@ -107,8 +125,52 @@ public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
clusterService.submitStateUpdateTask("remove-data-stream [" + request.name + "]", new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public ClusterState execute(ClusterState currentState) {
return removeDataStream(currentState, request);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}
static ClusterState removeDataStream(ClusterState currentState, Request request) {
Set<String> dataStreams = new HashSet<>();
for (String dataStreamName : currentState.metaData().dataStreams().keySet()) {
if (Regex.simpleMatch(request.name, dataStreamName)) {
dataStreams.add(dataStreamName);
}
}
if (dataStreams.isEmpty()) {
// if a match-all pattern was specified and no data streams were found because none exist, do not
// fail with data stream missing exception
if (Regex.isMatchAllPattern(request.name)) {
return currentState;
}
throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found");
}
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
for (String dataStreamName : dataStreams) {
logger.info("removing data stream [{}]", dataStreamName);
metaData.removeDataStream(dataStreamName);
}
return ClusterState.builder(currentState).metaData(metaData).build();
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {

View File

@ -34,15 +34,17 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class GetDataStreamsAction extends ActionType<GetDataStreamsAction.Response> {
@ -154,11 +156,30 @@ public class GetDataStreamsAction extends ActionType<GetDataStreamsAction.Respon
@Override
protected void masterOperation(Request request, ClusterState state,
ActionListener<Response> listener) throws Exception {
List<DataStream> dataStreams = Arrays.asList(
new DataStream("my_data_stream1", "@timestamp", Collections.singletonList("my_data_stream1-000000")),
new DataStream("my_data_stream2", "@timestamp", Collections.emptyList())
);
listener.onResponse(new Response(dataStreams));
listener.onResponse(new Response(getDataStreams(state, request)));
}
static List<DataStream> getDataStreams(ClusterState clusterState, Request request) {
Map<String, DataStream> dataStreams = clusterState.metaData().dataStreams();
// return all data streams if no name was specified
if (request.names.length == 0) {
return new ArrayList<>(dataStreams.values());
}
final List<DataStream> results = new ArrayList<>();
for (String name : request.names) {
if (Regex.isSimpleMatchPattern(name)) {
for (Map.Entry<String, DataStream> entry : dataStreams.entrySet()) {
if (Regex.simpleMatch(name, entry.getKey())) {
results.add(entry.getValue());
}
}
} else if (dataStreams.containsKey(name)) {
results.add(dataStreams.get(name));
}
}
return results;
}
@Override

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.ComponentTemplateMetadata;
import org.elasticsearch.cluster.metadata.DataStreamMetadata;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateV2Metadata;
@ -136,6 +137,7 @@ public class ClusterModule extends AbstractModule {
ComponentTemplateMetadata::readDiffFrom);
registerMetaDataCustom(entries, IndexTemplateV2Metadata.TYPE, IndexTemplateV2Metadata::new,
IndexTemplateV2Metadata::readDiffFrom);
registerMetaDataCustom(entries, DataStreamMetadata.TYPE, DataStreamMetadata::new, DataStreamMetadata::readDiffFrom);
// Task Status (not Diffable)
entries.add(new Entry(Task.Status.class, PersistentTasksNodeService.Status.NAME, PersistentTasksNodeService.Status::new));
return entries;
@ -187,6 +189,8 @@ public class ClusterModule extends AbstractModule {
ComponentTemplateMetadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(IndexTemplateV2Metadata.TYPE),
IndexTemplateV2Metadata::fromXContent));
entries.add(new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField(DataStreamMetadata.TYPE),
DataStreamMetadata::fromXContent));
return entries;
}

View File

@ -0,0 +1,188 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Custom {@link MetaData} implementation for storing a map of {@link DataStream}s and their names.
*/
public class DataStreamMetadata implements MetaData.Custom {
public static final String TYPE = "data_stream";
private static final ParseField DATA_STREAM = new ParseField("data_stream");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStreamMetadata, Void> PARSER = new ConstructingObjectParser<>(TYPE, false,
a -> new DataStreamMetadata((Map<String, DataStream>) a[0]));
static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> {
Map<String, DataStream> dataStreams = new HashMap<>();
while (p.nextToken() != XContentParser.Token.END_OBJECT) {
String name = p.currentName();
dataStreams.put(name, DataStream.fromXContent(p));
}
return dataStreams;
}, DATA_STREAM);
}
private final Map<String, DataStream> dataStreams;
public DataStreamMetadata(Map<String, DataStream> dataStreams) {
this.dataStreams = dataStreams;
}
public DataStreamMetadata(StreamInput in) throws IOException {
this.dataStreams = in.readMap(StreamInput::readString, DataStream::new);
}
public Map<String, DataStream> dataStreams() {
return this.dataStreams;
}
@Override
public Diff<MetaData.Custom> diff(MetaData.Custom before) {
return new DataStreamMetadata.DataStreamMetadataDiff((DataStreamMetadata) before, this);
}
public static NamedDiff<MetaData.Custom> readDiffFrom(StreamInput in) throws IOException {
return new DataStreamMetadata.DataStreamMetadataDiff(in);
}
@Override
public EnumSet<MetaData.XContentContext> context() {
return MetaData.ALL_CONTEXTS;
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public Version getMinimalSupportedVersion() {
return Version.V_7_7_0;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.dataStreams, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
}
public static DataStreamMetadata fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(DATA_STREAM.getPreferredName());
for (Map.Entry<String, DataStream> dataStream : dataStreams.entrySet()) {
builder.field(dataStream.getKey(), dataStream.getValue());
}
builder.endObject();
return builder;
}
public static Builder builder() {
return new Builder();
}
@Override
public int hashCode() {
return Objects.hash(this.dataStreams);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
DataStreamMetadata other = (DataStreamMetadata) obj;
return Objects.equals(this.dataStreams, other.dataStreams);
}
@Override
public String toString() {
return Strings.toString(this);
}
public static class Builder {
private final Map<String, DataStream> dataStreams = new HashMap<>();
public Builder putDataStream(DataStream dataStream) {
dataStreams.put(dataStream.getName(), dataStream);
return this;
}
public DataStreamMetadata build() {
return new DataStreamMetadata(dataStreams);
}
}
static class DataStreamMetadataDiff implements NamedDiff<MetaData.Custom> {
final Diff<Map<String, DataStream>> dataStreamDiff;
DataStreamMetadataDiff(DataStreamMetadata before, DataStreamMetadata after) {
this.dataStreamDiff = DiffableUtils.diff(before.dataStreams, after.dataStreams,
DiffableUtils.getStringKeySerializer());
}
DataStreamMetadataDiff(StreamInput in) throws IOException {
this.dataStreamDiff = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(),
DataStream::new, DataStream::readDiffFrom);
}
@Override
public MetaData.Custom apply(MetaData.Custom part) {
return new DataStreamMetadata(dataStreamDiff.apply(((DataStreamMetadata) part).dataStreams));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
dataStreamDiff.writeTo(out);
}
@Override
public String getWriteableName() {
return TYPE;
}
}
}

View File

@ -733,6 +733,12 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
.orElse(Collections.emptyMap());
}
public Map<String, DataStream> dataStreams() {
return Optional.ofNullable((DataStreamMetadata) this.custom(DataStreamMetadata.TYPE))
.map(DataStreamMetadata::dataStreams)
.orElse(Collections.emptyMap());
}
public ImmutableOpenMap<String, Custom> customs() {
return this.customs;
}
@ -1197,6 +1203,32 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
return this;
}
public Builder dataStreams(Map<String, DataStream> dataStreams) {
this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(dataStreams));
return this;
}
public Builder put(DataStream dataStream) {
Objects.requireNonNull(dataStream, "it is invalid to add a null data stream");
Map<String, DataStream> existingDataStreams =
Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE))
.map(dsmd -> new HashMap<>(dsmd.dataStreams()))
.orElse(new HashMap<>());
existingDataStreams.put(dataStream.getName(), dataStream);
this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams));
return this;
}
public Builder removeDataStream(String name) {
Map<String, DataStream> existingDataStreams =
Optional.ofNullable((DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE))
.map(dsmd -> new HashMap<>(dsmd.dataStreams()))
.orElse(new HashMap<>());
existingDataStreams.remove(name);
this.customs.put(DataStreamMetadata.TYPE, new DataStreamMetadata(existingDataStreams));
return this;
}
public Custom getCustom(String type) {
return customs.get(type);
}

View File

@ -18,10 +18,20 @@
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.Collections;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
@ -35,4 +45,40 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
request.setTimestampFieldName(randomAlphaOfLength(8));
return request;
}
public void testValidateRequest() {
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream");
req.setTimestampFieldName("my-timestamp-field");
ActionRequestValidationException e = req.validate();
assertNull(e);
}
public void testValidateRequestWithoutTimestampField() {
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream");
ActionRequestValidationException e = req.validate();
assertNotNull(e);
assertThat(e.validationErrors().size(), equalTo(1));
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
}
public void testCreateDataStream() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req);
assertThat(newState.metaData().dataStreams().size(), equalTo(1));
assertThat(newState.metaData().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
}
public void testCreateDuplicateDataStream() {
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
}
}

View File

@ -18,10 +18,21 @@
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.Collections;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
@ -33,4 +44,38 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
protected Request createTestInstance() {
return new Request(randomAlphaOfLength(8));
}
public void testValidateRequest() {
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request("my-data-stream");
ActionRequestValidationException e = req.validate();
assertNull(e);
}
public void testValidateRequestWithoutName() {
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request("");
ActionRequestValidationException e = req.validate();
assertNotNull(e);
assertThat(e.validationErrors().size(), equalTo(1));
assertThat(e.validationErrors().get(0), containsString("name is missing"));
}
public void testDeleteDataStream() {
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(cs, req);
assertThat(newState.metaData().dataStreams().size(), equalTo(0));
}
public void testDeleteNonexistentDataStream() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> DeleteDataStreamAction.TransportAction.removeDataStream(cs, req));
assertThat(e.getMessage(), containsString("data_streams matching [" + dataStreamName + "] not found"));
}
}

View File

@ -18,10 +18,20 @@
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
@ -33,4 +43,29 @@ public class GetDataStreamsRequestTests extends AbstractWireSerializingTestCase<
protected Request createTestInstance() {
return new Request(generateRandomStringArray(8, 8, false));
}
public void testValidateRequest() {
GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{});
ActionRequestValidationException e = req.validate();
assertNull(e);
}
public void testGetDataStreams() {
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build();
GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{dataStreamName});
List<DataStream> dataStreams = GetDataStreamsAction.TransportAction.getDataStreams(cs, req);
assertThat(dataStreams.size(), equalTo(1));
assertThat(dataStreams.get(0).getName(), equalTo(dataStreamName));
}
public void testGetNonexistentDataStream() {
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
GetDataStreamsAction.Request req = new GetDataStreamsAction.Request(new String[]{dataStreamName});
List<DataStream> dataStreams = GetDataStreamsAction.TransportAction.getDataStreams(cs, req);
assertThat(dataStreams.size(), equalTo(0));
}
}

View File

@ -0,0 +1,59 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractNamedWriteableTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class DataStreamMetadataTests extends AbstractNamedWriteableTestCase<DataStreamMetadata> {
@Override
protected DataStreamMetadata createTestInstance() {
if (randomBoolean()) {
return new DataStreamMetadata(Collections.emptyMap());
}
Map<String, DataStream> dataStreams = new HashMap<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
dataStreams.put(randomAlphaOfLength(5), DataStreamTests.randomInstance());
}
return new DataStreamMetadata(dataStreams);
}
@Override
protected DataStreamMetadata mutateInstance(DataStreamMetadata instance) throws IOException {
return randomValueOtherThan(instance, this::createTestInstance);
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Collections.singletonList(new NamedWriteableRegistry.Entry(DataStreamMetadata.class,
DataStreamMetadata.TYPE, DataStreamMetadata::new)));
}
@Override
protected Class<DataStreamMetadata> categoryClass() {
return DataStreamMetadata.class;
}
}

View File

@ -28,6 +28,15 @@ import java.util.List;
public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
public static DataStream randomInstance() {
int numIndices = randomIntBetween(0, 128);
List<String> indices = new ArrayList<>(numIndices);
for (int i = 0; i < numIndices; i++) {
indices.add(randomAlphaOfLength(10));
}
return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices);
}
@Override
protected DataStream doParseInstance(XContentParser parser) throws IOException {
return DataStream.fromXContent(parser);
@ -40,11 +49,7 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
@Override
protected DataStream createTestInstance() {
int numIndices = randomIntBetween(0, 128);
List<String> indices = new ArrayList<>(numIndices);
for (int i = 0; i < numIndices; i++) {
indices.add(randomAlphaOfLength(10));
}
return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), indices);
return randomInstance();
}
}

View File

@ -973,6 +973,7 @@ public class MetaDataTests extends ESTestCase {
.version(randomNonNegativeLong())
.put("component_template_" + randomAlphaOfLength(3), ComponentTemplateTests.randomInstance())
.put("index_template_v2_" + randomAlphaOfLength(3), IndexTemplateV2Tests.randomInstance())
.put(DataStreamTests.randomInstance())
.build();
}
}

View File

@ -158,6 +158,8 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
.putAlias(newAliasMetaDataBuilder("alias-bar1"))
.putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
.putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
.put(new DataStream("data-stream1", "@timestamp", Collections.emptyList()))
.put(new DataStream("data-stream2", "@timestamp2", Collections.emptyList()))
.build();
String metaDataSource = MetaData.Builder.toXContent(metaData);
@ -345,6 +347,16 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
equalTo(new Template(Settings.builder().put("setting", "value").build(),
new CompressedXContent("{\"baz\":\"eggplant\"}"),
Collections.singletonMap("alias", AliasMetaData.builder("alias").build()))));
// data streams
assertNotNull(parsedMetaData.dataStreams().get("data-stream1"));
assertThat(parsedMetaData.dataStreams().get("data-stream1").getName(), is("data-stream1"));
assertThat(parsedMetaData.dataStreams().get("data-stream1").getTimeStampField(), is("@timestamp"));
assertThat(parsedMetaData.dataStreams().get("data-stream1").getIndices(), is(Collections.emptyList()));
assertNotNull(parsedMetaData.dataStreams().get("data-stream2"));
assertThat(parsedMetaData.dataStreams().get("data-stream2").getName(), is("data-stream2"));
assertThat(parsedMetaData.dataStreams().get("data-stream2").getTimeStampField(), is("@timestamp2"));
assertThat(parsedMetaData.dataStreams().get("data-stream2").getIndices(), is(Collections.emptyList()));
}
private static final String MAPPING_SOURCE1 = "{\"mapping1\":{\"text1\":{\"type\":\"string\"}}}";