[7.x] Create first backing index when creating data stream

This commit is contained in:
Dan Hermann 2020-04-02 17:19:35 -05:00 committed by GitHub
parent 6e73f67f3b
commit 39c4ec6821
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 26 deletions

View File

@ -1,8 +1,8 @@
---
"Create data stream":
- skip:
version: " - 7.6.99"
reason: available only in 7.7+
version: " - 7.99.99"
reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467"
- do:
indices.create_data_stream:
@ -22,10 +22,17 @@
indices.get_data_streams: {}
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.indices: [] }
- length: { 0.indices: 1 }
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.indices: [] }
- length: { 1.indices: 1 }
- match: { 1.indices.0.index_name: 'simple-data-stream2-000001' }
- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do:
indices.delete_data_stream:

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
@ -33,21 +34,23 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.List;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
@ -116,10 +119,14 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final MetadataCreateIndexService metadataCreateIndexService;
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService metaDataCreateIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.metadataCreateIndexService = metaDataCreateIndexService;
}
@Override
@ -150,7 +157,7 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return createDataStream(currentState, request);
return createDataStream(metadataCreateIndexService, currentState, request);
}
@Override
@ -160,7 +167,9 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
});
}
static ClusterState createDataStream(ClusterState currentState, Request request) {
static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService,
ClusterState currentState,
Request request) throws Exception {
if (currentState.metadata().dataStreams().containsKey(request.name)) {
throw new IllegalArgumentException("data_stream [" + request.name + "] already exists");
}
@ -168,8 +177,16 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
MetadataCreateIndexService.validateIndexOrAliasName(request.name,
(s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2));
String firstBackingIndexName = request.name + "-000001";
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
.settings(Settings.builder().put("index.hidden", true).build());
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
assert firstBackingIndex != null;
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
new DataStream(request.name, request.timestampFieldName, Collections.emptyList()));
new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex())));
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
}

View File

@ -57,7 +57,7 @@ public interface IndexAbstraction {
/**
* A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction.
*
* <p>
* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned
* by {@link #getIndices()}. An index abstraction may also not have a dedicated write index.
*
@ -88,7 +88,14 @@ public interface IndexAbstraction {
* An alias typically refers to many concrete indices and
* may have a write index.
*/
ALIAS("alias");
ALIAS("alias"),
/**
* An index abstraction that refers to a data stream.
* A data stream typically has multiple backing indices, the latest of which
* is the target for index requests.
*/
DATA_STREAM("data_stream");
private final String displayName;
@ -182,7 +189,7 @@ public interface IndexAbstraction {
/**
* Returns the unique alias metadata per concrete index.
*
* <p>
* (note that although alias can point to the same concrete indices, each alias reference may have its own routing
* and filters)
*/

View File

@ -79,6 +79,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.common.settings.Settings.readSettingsFromStream;
@ -1443,6 +1444,7 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
});
}
}
aliasAndIndexLookup.values().stream()
.filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS)
.forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties());
@ -1453,15 +1455,28 @@ public class Metadata implements Iterable<IndexMetadata>, Diffable<Metadata>, To
DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE);
if (dsMetadata != null) {
for (DataStream ds : dsMetadata.dataStreams().values()) {
if (indicesLookup.containsKey(ds.getName())) {
IndexAbstraction existing = indicesLookup.get(ds.getName());
if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) {
throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias");
}
SortedMap<?, ?> map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (map.size() != 0) {
SortedMap<String, IndexAbstraction> potentialConflicts =
indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-'
if (potentialConflicts.size() != 0) {
List<String> indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList());
List<String> conflicts = new ArrayList<>();
for (Map.Entry<String, IndexAbstraction> entry : potentialConflicts.entrySet()) {
if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX ||
indexNames.contains(entry.getKey()) == false) {
conflicts.add(entry.getKey());
}
}
if (conflicts.size() > 0) {
throw new IllegalStateException("data stream [" + ds.getName() +
"] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" +
" including '" + map.firstKey() + "'");
"] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" +
" including '" + conflicts.get(0) + "'");
}
}
}
}

View File

@ -18,19 +18,29 @@
*/
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.Collections;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@ -61,16 +71,20 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
}
public void testCreateDataStream() {
public void testCreateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req);
ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue());
assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true"));
}
public void testCreateDuplicateDataStream() {
public void testCreateDuplicateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -78,16 +92,39 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists"));
}
public void testCreateDataStreamWithInvalidName() {
public void testCreateDataStreamWithInvalidName() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "_My-da#ta- ,stream-";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> CreateDataStreamAction.TransportAction.createDataStream(cs, req));
() -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("must not contain the following characters"));
}
private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))
.thenAnswer(mockInvocation -> {
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1];
Metadata.Builder b = Metadata.builder(currentState.metadata())
.put(IndexMetadata.builder(request.index())
.settings(Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(request.settings())
.build())
.numberOfShards(1)
.numberOfReplicas(1)
.build(), false);
return ClusterState.builder(currentState).metadata(b.build()).build();
});
return s;
}
}

View File

@ -43,11 +43,13 @@ import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@ -973,7 +975,7 @@ public class MetadataTests extends ESTestCase {
public void testBuilderRejectsDataStreamWithConflictingBackingIndices() {
final String dataStreamName = "my-data-stream";
final String conflictingIndex = dataStreamName + "-00001";
final String conflictingIndex = dataStreamName + "-000001";
Metadata.Builder b = Metadata.builder()
.put(IndexMetadata.builder(conflictingIndex)
.settings(settings(Version.CURRENT))
@ -987,6 +989,29 @@ public class MetadataTests extends ESTestCase {
"] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'"));
}
public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() {
final String dataStreamName = "my-data-stream";
final List<Index> backingIndices = new ArrayList<>();
final int numBackingIndices = randomIntBetween(2, 5);
int lastBackingIndexNum = randomIntBetween(9, 50);
Metadata.Builder b = Metadata.builder();
for (int k = 1; k <= numBackingIndices; k++) {
IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
b.put(im, false);
backingIndices.add(im.getIndex());
lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50);
}
b.put(new DataStream(dataStreamName, "ts", backingIndices));
Metadata metadata = b.build();
assertThat(metadata.dataStreams().size(), equalTo(1));
assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
}
public void testSerialization() throws IOException {
final Metadata orig = randomMetadata();
final BytesStreamOutput out = new BytesStreamOutput();