mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 09:28:27 +00:00
Identify backing indices for data streams
This commit is contained in:
parent
1caa2f0515
commit
402b6b1715
@ -69,6 +69,12 @@ public interface IndexAbstraction {
|
||||
@Nullable
|
||||
IndexMetadata getWriteIndex();
|
||||
|
||||
/**
|
||||
* @return the data stream to which this index belongs or <code>null</code> if this is not a concrete index or
|
||||
* if it is a concrete index that does not belong to a data stream.
|
||||
*/
|
||||
@Nullable DataStream getParentDataStream();
|
||||
|
||||
/**
|
||||
* @return whether this index abstraction is hidden or not
|
||||
*/
|
||||
@ -116,9 +122,15 @@ public interface IndexAbstraction {
|
||||
class Index implements IndexAbstraction {
|
||||
|
||||
private final IndexMetadata concreteIndex;
|
||||
private final DataStream dataStream;
|
||||
|
||||
public Index(IndexMetadata indexMetadata, DataStream dataStream) {
|
||||
this.concreteIndex = indexMetadata;
|
||||
this.dataStream = dataStream;
|
||||
}
|
||||
|
||||
public Index(IndexMetadata indexMetadata) {
|
||||
this.concreteIndex = indexMetadata;
|
||||
this(indexMetadata, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -141,6 +153,11 @@ public interface IndexAbstraction {
|
||||
return concreteIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStream getParentDataStream() {
|
||||
return dataStream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHidden() {
|
||||
return INDEX_HIDDEN_SETTING.get(concreteIndex.getSettings());
|
||||
@ -184,6 +201,12 @@ public interface IndexAbstraction {
|
||||
return writeIndex.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStream getParentDataStream() {
|
||||
// aliases may not be part of a data stream
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHidden() {
|
||||
return isHidden;
|
||||
@ -293,6 +316,12 @@ public interface IndexAbstraction {
|
||||
return writeIndex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataStream getParentDataStream() {
|
||||
// a data stream cannot have a parent data stream
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHidden() {
|
||||
return false;
|
||||
|
@ -1363,7 +1363,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
|
||||
final List<String> visibleOpenIndices = new ArrayList<>();
|
||||
final List<String> allClosedIndices = new ArrayList<>();
|
||||
final List<String> visibleClosedIndices = new ArrayList<>();
|
||||
final Set<String> duplicateAliasesIndices = new HashSet<>();
|
||||
final Set<String> allAliases = new HashSet<>();
|
||||
for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
|
||||
final IndexMetadata indexMetadata = cursor.value;
|
||||
final String name = indexMetadata.getIndex().getName();
|
||||
@ -1384,23 +1384,55 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
|
||||
visibleClosedIndices.add(name);
|
||||
}
|
||||
}
|
||||
indexMetadata.getAliases().keysIt().forEachRemaining(duplicateAliasesIndices::add);
|
||||
indexMetadata.getAliases().keysIt().forEachRemaining(allAliases::add);
|
||||
}
|
||||
duplicateAliasesIndices.retainAll(allIndices);
|
||||
if (duplicateAliasesIndices.isEmpty() == false) {
|
||||
|
||||
final Set<String> allDataStreams = new HashSet<>();
|
||||
DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
|
||||
if (dataStreamMetadata != null) {
|
||||
for (DataStream dataStream : dataStreamMetadata.dataStreams().values()) {
|
||||
allDataStreams.add(dataStream.getName());
|
||||
}
|
||||
}
|
||||
|
||||
final Set<String> aliasDuplicatesWithIndices = new HashSet<>(allAliases);
|
||||
aliasDuplicatesWithIndices.retainAll(allIndices);
|
||||
ArrayList<String> duplicates = new ArrayList<>();
|
||||
if (aliasDuplicatesWithIndices.isEmpty() == false) {
|
||||
// iterate again and constructs a helpful message
|
||||
ArrayList<String> duplicates = new ArrayList<>();
|
||||
for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
|
||||
for (String alias : duplicateAliasesIndices) {
|
||||
for (String alias : aliasDuplicatesWithIndices) {
|
||||
if (cursor.value.getAliases().containsKey(alias)) {
|
||||
duplicates.add(alias + " (alias of " + cursor.value.getIndex() + ")");
|
||||
duplicates.add(alias + " (alias of " + cursor.value.getIndex() + ") conflicts with index");
|
||||
}
|
||||
}
|
||||
}
|
||||
assert duplicates.size() > 0;
|
||||
throw new IllegalStateException("index and alias names need to be unique, but the following duplicates were found ["
|
||||
+ Strings.collectionToCommaDelimitedString(duplicates) + "]");
|
||||
}
|
||||
|
||||
final Set<String> aliasDuplicatesWithDataStreams = new HashSet<>(allAliases);
|
||||
aliasDuplicatesWithDataStreams.retainAll(allDataStreams);
|
||||
if (aliasDuplicatesWithDataStreams.isEmpty() == false) {
|
||||
// iterate again and constructs a helpful message
|
||||
for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
|
||||
for (String alias : aliasDuplicatesWithDataStreams) {
|
||||
if (cursor.value.getAliases().containsKey(alias)) {
|
||||
duplicates.add(alias + " (alias of " + cursor.value.getIndex() + ") conflicts with data stream");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final Set<String> dataStreamDuplicatesWithIndices = new HashSet<>(allDataStreams);
|
||||
dataStreamDuplicatesWithIndices.retainAll(allIndices);
|
||||
if (dataStreamDuplicatesWithIndices.isEmpty() == false) {
|
||||
for (String dataStream : dataStreamDuplicatesWithIndices) {
|
||||
duplicates.add("data stream [" + dataStream + "] conflicts with index");
|
||||
}
|
||||
}
|
||||
|
||||
if (duplicates.size() > 0) {
|
||||
throw new IllegalStateException("index, alias, and data stream names need to be unique, but the following duplicates " +
|
||||
"were found [" + Strings.collectionToCommaDelimitedString(duplicates) + "]");
|
||||
}
|
||||
|
||||
SortedMap<String, IndexAbstraction> indicesLookup = Collections.unmodifiableSortedMap(buildIndicesLookup());
|
||||
@ -1425,10 +1457,40 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
|
||||
|
||||
private SortedMap<String, IndexAbstraction> buildIndicesLookup() {
|
||||
SortedMap<String, IndexAbstraction> indicesLookup = new TreeMap<>();
|
||||
Map<String, DataStream> indexToDataStreamLookup = new HashMap<>();
|
||||
DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
|
||||
// If there are no indices, then skip data streams. This happens only when metadata is read from disk
|
||||
if (dataStreamMetadata != null && indices.size() > 0) {
|
||||
for (DataStream dataStream : dataStreamMetadata.dataStreams().values()) {
|
||||
List<IndexMetadata> backingIndices = dataStream.getIndices().stream()
|
||||
.map(index -> indices.get(index.getName()))
|
||||
.collect(Collectors.toList());
|
||||
assert backingIndices.isEmpty() == false;
|
||||
assert backingIndices.contains(null) == false;
|
||||
|
||||
IndexAbstraction existing = indicesLookup.put(dataStream.getName(),
|
||||
new IndexAbstraction.DataStream(dataStream, backingIndices));
|
||||
assert existing == null : "duplicate data stream for " + dataStream.getName();
|
||||
|
||||
for (Index i : dataStream.getIndices()) {
|
||||
indexToDataStreamLookup.put(i.getName(), dataStream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (ObjectCursor<IndexMetadata> cursor : indices.values()) {
|
||||
IndexMetadata indexMetadata = cursor.value;
|
||||
IndexAbstraction existing =
|
||||
indicesLookup.put(indexMetadata.getIndex().getName(), new IndexAbstraction.Index(indexMetadata));
|
||||
|
||||
IndexAbstraction.Index index;
|
||||
DataStream parent = indexToDataStreamLookup.get(indexMetadata.getIndex().getName());
|
||||
if (parent != null) {
|
||||
assert parent.getIndices().contains(indexMetadata.getIndex());
|
||||
index = new IndexAbstraction.Index(indexMetadata, (IndexAbstraction.DataStream) indicesLookup.get(parent.getName()));
|
||||
} else {
|
||||
index = new IndexAbstraction.Index(indexMetadata);
|
||||
}
|
||||
|
||||
IndexAbstraction existing = indicesLookup.put(indexMetadata.getIndex().getName(), index);
|
||||
assert existing == null : "duplicate for " + indexMetadata.getIndex();
|
||||
|
||||
for (ObjectObjectCursor<String, AliasMetadata> aliasCursor : indexMetadata.getAliases()) {
|
||||
@ -1445,28 +1507,6 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
|
||||
}
|
||||
}
|
||||
|
||||
DataStreamMetadata dataStreamMetadata = (DataStreamMetadata) this.customs.get(DataStreamMetadata.TYPE);
|
||||
// If there are no indices then it doesn't make sense to to add data streams to indicesLookup,
|
||||
// since there no concrete indices that a data stream can point to.
|
||||
// (This occurs when only Metadata is read from disk.)
|
||||
if (dataStreamMetadata != null && indices.size() > 0) {
|
||||
for (Map.Entry<String, DataStream> entry : dataStreamMetadata.dataStreams().entrySet()) {
|
||||
DataStream dataStream = entry.getValue();
|
||||
List<IndexMetadata> backingIndices = dataStream.getIndices().stream()
|
||||
.map(index -> indices.get(index.getName()))
|
||||
.collect(Collectors.toList());
|
||||
assert backingIndices.isEmpty() == false;
|
||||
assert backingIndices.contains(null) == false;
|
||||
|
||||
IndexAbstraction existing = indicesLookup.put(dataStream.getName(),
|
||||
new IndexAbstraction.DataStream(dataStream, backingIndices));
|
||||
if (existing != null) {
|
||||
throw new IllegalStateException("data stream [" + dataStream.getName() +
|
||||
"] conflicts with existing " + existing.getType().getDisplayName() + " [" + existing.getName() + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
indicesLookup.values().stream()
|
||||
.filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
|
||||
.forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());
|
||||
|
@ -22,6 +22,7 @@ package org.elasticsearch.cluster.metadata;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
|
||||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.cluster.DataStreamTestHelper;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationMetadata;
|
||||
import org.elasticsearch.cluster.coordination.CoordinationMetadata.VotingConfigExclusion;
|
||||
import org.elasticsearch.common.Strings;
|
||||
@ -50,6 +51,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
|
||||
import static org.elasticsearch.cluster.DataStreamTestHelper.createBackingIndex;
|
||||
import static org.elasticsearch.cluster.DataStreamTestHelper.createFirstBackingIndex;
|
||||
@ -163,7 +165,8 @@ public class MetadataTests extends ESTestCase {
|
||||
fail("exception should have been thrown");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(),
|
||||
equalTo("index and alias names need to be unique, but the following duplicates were found [index (alias of [index])]"));
|
||||
equalTo("index, alias, and data stream names need to be unique, but the following duplicates were found [index (alias " +
|
||||
"of [index]) conflicts with index]"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,7 +201,7 @@ public class MetadataTests extends ESTestCase {
|
||||
metadataBuilder.build();
|
||||
fail("exception should have been thrown");
|
||||
} catch (IllegalStateException e) {
|
||||
assertThat(e.getMessage(), startsWith("index and alias names need to be unique"));
|
||||
assertThat(e.getMessage(), startsWith("index, alias, and data stream names need to be unique"));
|
||||
}
|
||||
}
|
||||
|
||||
@ -959,7 +962,8 @@ public class MetadataTests extends ESTestCase {
|
||||
|
||||
IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
|
||||
assertThat(e.getMessage(),
|
||||
containsString("data stream [" + dataStreamName + "] conflicts with existing concrete index [" + dataStreamName + "]"));
|
||||
containsString("index, alias, and data stream names need to be unique, but the following duplicates were found [data " +
|
||||
"stream [" + dataStreamName + "] conflicts with index]"));
|
||||
}
|
||||
|
||||
public void testBuilderRejectsDataStreamThatConflictsWithAlias() {
|
||||
@ -973,7 +977,8 @@ public class MetadataTests extends ESTestCase {
|
||||
|
||||
IllegalStateException e = expectThrows(IllegalStateException.class, b::build);
|
||||
assertThat(e.getMessage(),
|
||||
containsString("data stream [" + dataStreamName + "] conflicts with existing alias [" + dataStreamName + "]"));
|
||||
containsString("index, alias, and data stream names need to be unique, but the following duplicates were found [" +
|
||||
dataStreamName + " (alias of [" + DataStream.getBackingIndexName(dataStreamName, 1) + "]) conflicts with data stream]"));
|
||||
}
|
||||
|
||||
public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
|
||||
@ -1060,6 +1065,54 @@ public class MetadataTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testIndicesLookupRecordsDataStreamForBackingIndices() {
|
||||
// create some indices that do not back a data stream
|
||||
final List<Index> indices = new ArrayList<>();
|
||||
final int numIndices = randomIntBetween(2, 5);
|
||||
int lastIndexNum = randomIntBetween(9, 50);
|
||||
Metadata.Builder b = Metadata.builder();
|
||||
for (int k = 1; k <= numIndices; k++) {
|
||||
IndexMetadata im = IndexMetadata.builder(DataStream.getBackingIndexName("index", lastIndexNum))
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
b.put(im, false);
|
||||
indices.add(im.getIndex());
|
||||
lastIndexNum = randomIntBetween(lastIndexNum + 1, lastIndexNum + 50);
|
||||
}
|
||||
|
||||
// create some backing indices for a data stream
|
||||
final String dataStreamName = "my-data-stream";
|
||||
final List<Index> backingIndices = new ArrayList<>();
|
||||
final int numBackingIndices = randomIntBetween(2, 5);
|
||||
int lastBackingIndexNum = 0;
|
||||
for (int k = 1; k <= numBackingIndices; k++) {
|
||||
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
|
||||
IndexMetadata im = IndexMetadata.builder(DataStream.getBackingIndexName(dataStreamName, lastBackingIndexNum))
|
||||
.settings(settings(Version.CURRENT))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1)
|
||||
.build();
|
||||
b.put(im, false);
|
||||
backingIndices.add(im.getIndex());
|
||||
}
|
||||
b.put(new DataStream(dataStreamName, "ts", backingIndices, lastBackingIndexNum));
|
||||
Metadata metadata = b.build();
|
||||
|
||||
SortedMap<String, IndexAbstraction> indicesLookup = metadata.getIndicesLookup();
|
||||
assertThat(indicesLookup.size(), equalTo(indices.size() + backingIndices.size() + 1));
|
||||
for (Index index : indices) {
|
||||
assertTrue(indicesLookup.containsKey(index.getName()));
|
||||
assertNull(indicesLookup.get(index.getName()).getParentDataStream());
|
||||
}
|
||||
for (Index index : backingIndices) {
|
||||
assertTrue(indicesLookup.containsKey(index.getName()));
|
||||
assertNotNull(indicesLookup.get(index.getName()).getParentDataStream());
|
||||
assertThat(indicesLookup.get(index.getName()).getParentDataStream().getName(), equalTo(dataStreamName));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
final Metadata orig = randomMetadata();
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
@ -1071,8 +1124,6 @@ public class MetadataTests extends ESTestCase {
|
||||
}
|
||||
|
||||
public static Metadata randomMetadata() {
|
||||
DataStream randomDataStream = DataStreamTests.randomInstance();
|
||||
|
||||
Metadata.Builder md = Metadata.builder()
|
||||
.put(buildIndexMetadata("index", "alias", randomBoolean() ? null : randomBoolean()).build(), randomBoolean())
|
||||
.put(IndexTemplateMetadata.builder("template" + randomAlphaOfLength(3))
|
||||
@ -1092,15 +1143,13 @@ public class MetadataTests extends ESTestCase {
|
||||
.indexGraveyard(IndexGraveyardTests.createRandom())
|
||||
.version(randomNonNegativeLong())
|
||||
.put("component_template_" + randomAlphaOfLength(3), ComponentTemplateTests.randomInstance())
|
||||
.put("index_template_v2_" + randomAlphaOfLength(3), IndexTemplateV2Tests.randomInstance())
|
||||
.put(randomDataStream);
|
||||
.put("index_template_v2_" + randomAlphaOfLength(3), IndexTemplateV2Tests.randomInstance());
|
||||
|
||||
DataStream randomDataStream = DataStreamTests.randomInstance();
|
||||
for (Index index : randomDataStream.getIndices()) {
|
||||
md.put(IndexMetadata.builder(index.getName())
|
||||
.settings(ESTestCase.settings(Version.CURRENT).put("index.hidden", true))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1));
|
||||
md.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
|
||||
}
|
||||
md.put(randomDataStream);
|
||||
|
||||
return md.build();
|
||||
}
|
||||
|
@ -22,18 +22,33 @@ package org.elasticsearch.cluster;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_INDEX_UUID;
|
||||
|
||||
public final class DataStreamTestHelper {
|
||||
|
||||
private static final Settings.Builder SETTINGS = ESTestCase.settings(Version.CURRENT).put("index.hidden", true);
|
||||
private static final int NUMBER_OF_SHARDS = 1;
|
||||
private static final int NUMBER_OF_REPLICAS = 1;
|
||||
|
||||
public static IndexMetadata.Builder createFirstBackingIndex(String dataStreamName) {
|
||||
return createBackingIndex(dataStreamName, 1);
|
||||
}
|
||||
|
||||
public static IndexMetadata.Builder createBackingIndex(String dataStreamName, int generation) {
|
||||
return IndexMetadata.builder(DataStream.getBackingIndexName(dataStreamName, generation))
|
||||
.settings(ESTestCase.settings(Version.CURRENT).put("index.hidden", true))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(1);
|
||||
.settings(SETTINGS)
|
||||
.numberOfShards(NUMBER_OF_SHARDS)
|
||||
.numberOfReplicas(NUMBER_OF_REPLICAS);
|
||||
}
|
||||
|
||||
public static IndexMetadata.Builder getIndexMetadataBuilderForIndex(Index index) {
|
||||
return IndexMetadata.builder(index.getName())
|
||||
.settings(Settings.builder().put(SETTINGS.build()).put(SETTING_INDEX_UUID, index.getUUID()))
|
||||
.numberOfShards(NUMBER_OF_SHARDS)
|
||||
.numberOfReplicas(NUMBER_OF_REPLICAS);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user