Add explicit generation attribute to data streams

This commit is contained in:
Dan Hermann 2020-04-20 07:40:33 -05:00 committed by GitHub
parent 7d5f74e964
commit dc703d75f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 69 additions and 46 deletions

View File

@ -23,10 +23,12 @@
name: "*"
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- length: { 0.indices: 1 }
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 0.generation: 1 }
- length: { 1.indices: 1 }
- match: { 1.indices.0.index_name: 'simple-data-stream2-000001' }
@ -97,22 +99,27 @@
indices.get_data_streams: {}
- match: { 0.name: get-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- match: { 1.name: get-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.generation: 1 }
- do:
indices.get_data_streams:
name: get-data-stream1
- match: { 0.name: get-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- do:
indices.get_data_streams:
name: get-data-*
- match: { 0.name: get-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- match: { 1.name: get-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.generation: 1 }
- do:
indices.get_data_streams:
@ -169,6 +176,7 @@
indices.get_data_streams: {}
- match: { 0.name: delete-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- length: { 0.indices: 1 }
- match: { 0.indices.0.index_name: 'delete-data-stream1-000001' }

View File

@ -177,7 +177,7 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
MetadataCreateIndexService.validateIndexOrAliasName(request.name,
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
String firstBackingIndexName = request.name + "-000001";
String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
.settings(Settings.builder().put("index.hidden", true).build());

View File

@ -31,6 +31,7 @@ import org.elasticsearch.index.Index;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
public final class DataStream extends AbstractDiffable<DataStream> implements ToXContentObject {
@ -38,11 +39,17 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
private final String name;
private final String timeStampField;
private final List<Index> indices;
private long generation;
public DataStream(String name, String timeStampField, List<Index> indices) {
public DataStream(String name, String timeStampField, List<Index> indices, long generation) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = indices;
this.generation = generation;
}
public DataStream(String name, String timeStampField, List<Index> indices) {
this(name, timeStampField, indices, indices.size());
}
public String getName() {
@ -57,8 +64,16 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
return indices;
}
public long getGeneration() {
return generation;
}
public static String getBackingIndexName(String dataStreamName, long generation) {
return String.format(Locale.ROOT, "%s-%06d", dataStreamName, generation);
}
public DataStream(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readList(Index::new));
this(in.readString(), in.readString(), in.readList(Index::new), in.readVLong());
}
public static Diff<DataStream> readDiffFrom(StreamInput in) throws IOException {
@ -70,20 +85,23 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
out.writeString(name);
out.writeString(timeStampField);
out.writeList(indices);
out.writeVLong(generation);
}
public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
public static final ParseField GENERATION_FIELD = new ParseField("generation");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
args -> new DataStream((String) args[0], (String) args[1], (List<Index>) args[2]));
args -> new DataStream((String) args[0], (String) args[1], (List<Index>) args[2], (Long) args[3]));
static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), TIMESTAMP_FIELD_FIELD);
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> Index.fromXContent(p), INDICES_FIELD);
PARSER.declareLong(ConstructingObjectParser.constructorArg(), GENERATION_FIELD);
}
public static DataStream fromXContent(XContentParser parser) throws IOException {
@ -96,6 +114,7 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
builder.field(NAME_FIELD.getPreferredName(), name);
builder.field(TIMESTAMP_FIELD_FIELD.getPreferredName(), timeStampField);
builder.field(INDICES_FIELD.getPreferredName(), indices);
builder.field(GENERATION_FIELD.getPreferredName(), generation);
builder.endObject();
return builder;
}
@ -107,11 +126,12 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
DataStream that = (DataStream) o;
return name.equals(that.name) &&
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices);
indices.equals(that.indices) &&
generation == that.generation;
}
@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices);
return Objects.hash(name, timeStampField, indices, generation);
}
}

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.DataStream.getBackingIndexName;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_HIDDEN_SETTING;
import static org.elasticsearch.common.collect.List.copyOf;
@ -266,12 +267,11 @@ public interface IndexAbstraction {
private final List<IndexMetadata> dataStreamIndices;
private final IndexMetadata writeIndex;
public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream,
List<IndexMetadata> dataStreamIndices, IndexMetadata writeIndex) {
public DataStream(org.elasticsearch.cluster.metadata.DataStream dataStream, List<IndexMetadata> dataStreamIndices) {
this.dataStream = dataStream;
this.dataStreamIndices = copyOf(dataStreamIndices);
this.writeIndex = writeIndex;
assert dataStreamIndices.contains(writeIndex);
this.writeIndex = dataStreamIndices.get(dataStreamIndices.size() - 1);
assert writeIndex.getIndex().getName().equals(getBackingIndexName(dataStream.getName(), dataStream.getGeneration()));
}
@Override

View File

@ -1458,9 +1458,8 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
assert backingIndices.isEmpty() == false;
assert backingIndices.contains(null) == false;
IndexMetadata writeIndex = backingIndices.get(backingIndices.size() - 1);
IndexAbstraction existing = indicesLookup.put(dataStream.getName(),
new IndexAbstraction.DataStream(dataStream, backingIndices, writeIndex));
new IndexAbstraction.DataStream(dataStream, backingIndices));
if (existing != null) {
throw new IllegalStateException("data stream [" + dataStream.getName() +
"] conflicts with existing " + existing.getType().getDisplayName() + " [" + existing.getName() + "]");

View File

@ -80,8 +80,9 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)), notNullValue());
assertThat(newState.metadata().index(DataStream.getBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
equalTo("true"));
}
public void testCreateDuplicateDataStream() throws Exception {

View File

@ -36,7 +36,6 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;
@ -75,13 +74,7 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
public void testDeleteDataStream() {
final String dataStreamName = "my-data-stream";
final List<String> otherIndices = randomSubsetOf(org.elasticsearch.common.collect.List.of("foo", "bar", "baz"));
ClusterState cs = getClusterState(
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, org.elasticsearch.common.collect.List.of(
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 1),
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 2)))),
otherIndices);
ClusterState cs = getClusterState(org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 2)), otherIndices);
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
@ -122,22 +115,22 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
/**
* Constructs {@code ClusterState} with the specified data streams and indices.
*
* @param dataStreamAndIndexNames The names of the data streams to create with their respective backing indices
* @param indexNames The names of indices to create that do not back any data streams
* @param dataStreams The names of the data streams to create with their respective number of backing indices
* @param indexNames The names of indices to create that do not back any data streams
*/
private static ClusterState getClusterState(List<Tuple<String, List<String>>> dataStreamAndIndexNames, List<String> indexNames) {
private static ClusterState getClusterState(List<Tuple<String, Integer>> dataStreams, List<String> indexNames) {
Metadata.Builder builder = Metadata.builder();
List<IndexMetadata> allIndices = new ArrayList<>();
for (Tuple<String, List<String>> dsTuple : dataStreamAndIndexNames) {
for (Tuple<String, Integer> dsTuple : dataStreams) {
List<IndexMetadata> backingIndices = new ArrayList<>();
for (String indexName : dsTuple.v2()) {
backingIndices.add(createIndexMetadata(indexName, true));
for (int backingIndexNumber = 1; backingIndexNumber <= dsTuple.v2(); backingIndexNumber++) {
backingIndices.add(createIndexMetadata(DataStream.getBackingIndexName(dsTuple.v1(), backingIndexNumber), true));
}
allIndices.addAll(backingIndices);
DataStream ds = new DataStream(dsTuple.v1(), "@timestamp",
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()));
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()), dsTuple.v2());
builder.put(ds);
}

View File

@ -40,7 +40,11 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
}
public static DataStream randomInstance() {
return new DataStream(randomAlphaOfLength(10), randomAlphaOfLength(10), randomIndexInstances());
List<Index> indices = randomIndexInstances();
long generation = randomLongBetween(1, 128);
String dataStreamName = randomAlphaOfLength(10);
indices.add(new Index(DataStream.getBackingIndexName(dataStreamName, generation), UUIDs.randomBase64UUID(random())));
return new DataStream(dataStreamName, randomAlphaOfLength(10), indices, generation);
}
@Override

View File

@ -1769,7 +1769,7 @@ public class IndexNameExpressionResolverTests extends ESTestCase {
Metadata.Builder mdBuilder = Metadata.builder()
.put(index1, false)
.put(index2, false)
.put(new DataStream(dataStreamName, "ts", org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex())));
.put(new DataStream(dataStreamName, "ts", org.elasticsearch.common.collect.List.of(index1.getIndex(), index2.getIndex()), 2));
ClusterState state = ClusterState.builder(new ClusterName("_name")).metadata(mdBuilder).build();
{

View File

@ -48,7 +48,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -965,7 +964,7 @@ public class MetadataTests extends ESTestCase {
public void testBuilderRejectsDataStreamThatConflictsWithAlias() {
final String dataStreamName = "my-data-stream";
IndexMetadata idx = createFirstBackingIndex(dataStreamName + "z")
IndexMetadata idx = createFirstBackingIndex(dataStreamName)
.putAlias(AliasMetadata.builder(dataStreamName).build())
.build();
Metadata.Builder b = Metadata.builder()
@ -980,7 +979,7 @@ public class MetadataTests extends ESTestCase {
public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
final String dataStreamName = "my-data-stream";
IndexMetadata validIdx = createFirstBackingIndex(dataStreamName).build();
final String conflictingIndex = dataStreamName + "-000002";
final String conflictingIndex = DataStream.getBackingIndexName(dataStreamName, 2);
IndexMetadata invalidIdx = createBackingIndex(dataStreamName, 2).build();
Metadata.Builder b = Metadata.builder()
.put(validIdx, false)
@ -994,7 +993,7 @@ public class MetadataTests extends ESTestCase {
public void testBuilderRejectsDataStreamWithConflictingBackingAlias() {
final String dataStreamName = "my-data-stream";
final String conflictingName = dataStreamName + "-000002";
final String conflictingName = DataStream.getBackingIndexName(dataStreamName, 2);
IndexMetadata idx = createFirstBackingIndex(dataStreamName)
.putAlias(new AliasMetadata.Builder(conflictingName))
.build();
@ -1011,20 +1010,20 @@ public class MetadataTests extends ESTestCase {
final String dataStreamName = "my-data-stream";
final List<Index> backingIndices = new ArrayList<>();
final int numBackingIndices = randomIntBetween(2, 5);
int lastBackingIndexNum = randomIntBetween(9, 50);
int lastBackingIndexNum = 0;
Metadata.Builder b = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
IndexMetadata im = IndexMetadata.builder(DataStream.getBackingIndexName(dataStreamName, lastBackingIndexNum))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
b.put(im, false);
backingIndices.add(im.getIndex());
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
}
b.put(new DataStream(dataStreamName, "ts", backingIndices));
b.put(new DataStream(dataStreamName, "ts", backingIndices, lastBackingIndexNum));
Metadata metadata = b.build();
assertThat(metadata.dataStreams().size(), equalTo(1));
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@ -1042,7 +1041,7 @@ public class MetadataTests extends ESTestCase {
indices.add(idx.getIndex());
b.put(idx, true);
}
b.put(new DataStream(name, "ts", indices));
b.put(new DataStream(name, "ts", indices, indices.size()));
}
Metadata metadata = b.build();

View File

@ -138,14 +138,14 @@ public class DataStreamIT extends ESIntegTestCase {
IndexRequest indexRequest = new IndexRequest(dataStreamName).source("{}", XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertThat(indexResponse.getIndex(), equalTo(dataStreamName + "-000001"));
assertThat(indexResponse.getIndex(), equalTo(DataStream.getBackingIndexName(dataStreamName, 1)));
}
{
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(dataStreamName).source("{}", XContentType.JSON)
.opType(DocWriteRequest.OpType.CREATE));
BulkResponse bulkItemResponses = client().bulk(bulkRequest).actionGet();
assertThat(bulkItemResponses.getItems()[0].getIndex(), equalTo(dataStreamName + "-000001"));
assertThat(bulkItemResponses.getItems()[0].getIndex(), equalTo(DataStream.getBackingIndexName(dataStreamName, 1)));
}
DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request("*");
@ -169,7 +169,7 @@ public class DataStreamIT extends ESIntegTestCase {
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(expectedNumHits));
Arrays.stream(searchResponse.getHits().getHits()).forEach(hit -> {
assertThat(hit.getIndex(), equalTo(dataStream + "-000001"));
assertThat(hit.getIndex(), equalTo(DataStream.getBackingIndexName(dataStream, 1)));
});
}

View File

@ -20,11 +20,10 @@
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.test.ESTestCase;
import java.util.Locale;
public final class DataStreamTestHelper {
public static IndexMetadata.Builder createFirstBackingIndex(String dataStreamName) {
@ -32,7 +31,7 @@ public final class DataStreamTestHelper {
}
public static IndexMetadata.Builder createBackingIndex(String dataStreamName, int generation) {
return IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, generation))
return IndexMetadata.builder(DataStream.getBackingIndexName(dataStreamName, generation))
.settings(ESTestCase.settings(Version.CURRENT).put("index.hidden", true))
.numberOfShards(1)
.numberOfReplicas(1);