Backing indices should use composable template matching with the corresponding data stream name (#57728)
Backport of #57640 to 7.x branch. Composable templates with exact matches, can match with the data stream name, but not with the backing index name. Also if the backing index naming scheme changes, then a composable template may never match with a backing index. In that case mappings and settings may not get applied.
This commit is contained in:
parent
3fe93e24a6
commit
f170b52e64
|
@ -32,6 +32,7 @@ import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIn
|
|||
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
|
@ -45,8 +46,12 @@ import org.elasticsearch.action.search.SearchResponse;
|
|||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.Template;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.xcontent.ObjectPath;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.junit.After;
|
||||
|
||||
|
@ -75,6 +80,8 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.startsWith;
|
||||
|
||||
public class DataStreamIT extends ESIntegTestCase {
|
||||
|
||||
|
@ -211,6 +218,76 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The composable template that matches with the data stream name should always be used for backing indices.
|
||||
* It is possible that a backing index doesn't match with a template or a different template, but in order
|
||||
* to avoid confusion, the template matching with the corresponding data stream name should be used.
|
||||
*/
|
||||
public void testComposableTemplateOnlyMatchingWithDataStreamName() throws Exception {
|
||||
String dataStreamName = "logs-foobar";
|
||||
|
||||
String mapping = "{\n" +
|
||||
" \"properties\": {\n" +
|
||||
" \"baz_field\": {\n" +
|
||||
" \"type\": \"keyword\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }";
|
||||
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request("id_1");
|
||||
request.indexTemplate(
|
||||
new ComposableIndexTemplate(
|
||||
Collections.singletonList(dataStreamName), // use no wildcard, so that backing indices don't match just by name
|
||||
new Template(null,
|
||||
new CompressedXContent(mapping), null),
|
||||
null, null, null, null,
|
||||
new ComposableIndexTemplate.DataStreamTemplate("@timestamp"))
|
||||
);
|
||||
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
|
||||
|
||||
int numDocs = randomIntBetween(2, 16);
|
||||
indexDocs(dataStreamName, numDocs);
|
||||
verifyDocs(dataStreamName, numDocs, 1, 1);
|
||||
|
||||
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
|
||||
GetDataStreamAction.Response getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
|
||||
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getName(), equalTo(dataStreamName));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getTimeStampField(), equalTo("@timestamp"));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().size(), equalTo(1));
|
||||
assertThat(getDataStreamResponse.getDataStreams().get(0).getIndices().get(0).getName(), equalTo(dataStreamName + "-000001"));
|
||||
|
||||
GetIndexResponse getIndexResponse =
|
||||
client().admin().indices().getIndex(new GetIndexRequest().indices(dataStreamName)).actionGet();
|
||||
assertThat(getIndexResponse.getSettings().get(dataStreamName + "-000001"), notNullValue());
|
||||
assertThat(getIndexResponse.getSettings().get(dataStreamName + "-000001").getAsBoolean("index.hidden", null), is(true));
|
||||
assertThat(ObjectPath.eval("properties.baz_field.type",
|
||||
getIndexResponse.mappings().get(dataStreamName + "-000001").get("_doc").getSourceAsMap()), equalTo("keyword"));
|
||||
|
||||
RolloverResponse rolloverResponse = client().admin().indices().rolloverIndex(new RolloverRequest(dataStreamName, null)).get();
|
||||
assertThat(rolloverResponse.getNewIndex(), equalTo(dataStreamName + "-000002"));
|
||||
assertTrue(rolloverResponse.isRolledOver());
|
||||
|
||||
getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest().indices(dataStreamName + "-000002")).actionGet();
|
||||
assertThat(getIndexResponse.getSettings().get(dataStreamName + "-000002"), notNullValue());
|
||||
assertThat(getIndexResponse.getSettings().get(dataStreamName + "-000002").getAsBoolean("index.hidden", null), is(true));
|
||||
assertThat(ObjectPath.eval("properties.baz_field.type",
|
||||
getIndexResponse.mappings().get(dataStreamName + "-000002").get("_doc").getSourceAsMap()), equalTo("keyword"));
|
||||
|
||||
int numDocs2 = randomIntBetween(2, 16);
|
||||
indexDocs(dataStreamName, numDocs2);
|
||||
verifyDocs(dataStreamName, numDocs + numDocs2, 1, 2);
|
||||
|
||||
DeleteDataStreamAction.Request deleteDataStreamRequest = new DeleteDataStreamAction.Request(dataStreamName);
|
||||
client().admin().indices().deleteDataStream(deleteDataStreamRequest).actionGet();
|
||||
getDataStreamResponse = client().admin().indices().getDataStreams(getDataStreamRequest).actionGet();
|
||||
assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(0));
|
||||
|
||||
expectThrows(IndexNotFoundException.class,
|
||||
() -> client().admin().indices().getIndex(new GetIndexRequest().indices(dataStreamName + "-000001")).actionGet());
|
||||
expectThrows(IndexNotFoundException.class,
|
||||
() -> client().admin().indices().getIndex(new GetIndexRequest().indices(dataStreamName + "-000002")).actionGet());
|
||||
}
|
||||
|
||||
public void testDataStreamsResolvability() throws Exception {
|
||||
createIndexTemplate("id", "logs-*", "ts");
|
||||
String dataStreamName = "logs-foobar";
|
||||
|
@ -313,7 +390,13 @@ public class DataStreamIT extends ESIntegTestCase {
|
|||
.opType(DocWriteRequest.OpType.CREATE)
|
||||
.source("{}", XContentType.JSON));
|
||||
}
|
||||
client().bulk(bulkRequest).actionGet();
|
||||
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
|
||||
assertThat(bulkResponse.getItems().length, equalTo(numDocs));
|
||||
for (BulkItemResponse itemResponse : bulkResponse) {
|
||||
assertThat(itemResponse.getFailureMessage(), nullValue());
|
||||
assertThat(itemResponse.status(), equalTo(RestStatus.CREATED));
|
||||
assertThat(itemResponse.getIndex(), startsWith(dataStream + "-00000"));
|
||||
}
|
||||
client().admin().indices().refresh(new RefreshRequest(dataStream)).actionGet();
|
||||
}
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
|
|||
|
||||
private final String cause;
|
||||
private final String index;
|
||||
private String dataStreamName;
|
||||
private final String providedName;
|
||||
private Index recoverFrom;
|
||||
private ResizeType resizeType;
|
||||
|
@ -146,4 +147,34 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
|
|||
public boolean copySettings() {
|
||||
return copySettings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the data stream this new index will be part of.
|
||||
* If this new index will not be part of a data stream then this returns <code>null</code>.
|
||||
*/
|
||||
public String dataStreamName() {
|
||||
return dataStreamName;
|
||||
}
|
||||
|
||||
public CreateIndexClusterStateUpdateRequest dataStreamName(String dataStreamName) {
|
||||
this.dataStreamName = dataStreamName;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CreateIndexClusterStateUpdateRequest{" +
|
||||
"cause='" + cause + '\'' +
|
||||
", index='" + index + '\'' +
|
||||
", dataStreamName='" + dataStreamName + '\'' +
|
||||
", providedName='" + providedName + '\'' +
|
||||
", recoverFrom=" + recoverFrom +
|
||||
", resizeType=" + resizeType +
|
||||
", copySettings=" + copySettings +
|
||||
", settings=" + settings +
|
||||
", aliases=" + aliases +
|
||||
", blocks=" + blocks +
|
||||
", waitForActiveShards=" + waitForActiveShards +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ public class MetadataRolloverService {
|
|||
final String newWriteIndexName = DataStream.getBackingIndexName(ds.getName(), ds.getGeneration() + 1);
|
||||
|
||||
CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest =
|
||||
prepareDataStreamCreateIndexRequest(newWriteIndexName, createIndexRequest);
|
||||
prepareDataStreamCreateIndexRequest(dataStreamName, newWriteIndexName, createIndexRequest);
|
||||
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexClusterStateRequest, silent,
|
||||
(builder, indexMetadata) -> builder.put(ds.rollover(indexMetadata.getIndex())));
|
||||
|
||||
|
@ -180,10 +180,12 @@ public class MetadataRolloverService {
|
|||
}
|
||||
}
|
||||
|
||||
static CreateIndexClusterStateUpdateRequest prepareDataStreamCreateIndexRequest(final String targetIndexName,
|
||||
static CreateIndexClusterStateUpdateRequest prepareDataStreamCreateIndexRequest(final String dataStreamName,
|
||||
final String targetIndexName,
|
||||
CreateIndexRequest createIndexRequest) {
|
||||
Settings settings = Settings.builder().put("index.hidden", true).build();
|
||||
return prepareCreateIndexRequest(targetIndexName, targetIndexName, "rollover_data_stream", createIndexRequest, settings);
|
||||
return prepareCreateIndexRequest(targetIndexName, targetIndexName, "rollover_data_stream", createIndexRequest, settings)
|
||||
.dataStreamName(dataStreamName);
|
||||
}
|
||||
|
||||
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(
|
||||
|
|
|
@ -133,6 +133,7 @@ public class MetadataCreateDataStreamService {
|
|||
String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
|
||||
CreateIndexClusterStateUpdateRequest createIndexRequest =
|
||||
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
|
||||
.dataStreamName(request.name)
|
||||
.settings(Settings.builder().put("index.hidden", true).build());
|
||||
currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false);
|
||||
IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName);
|
||||
|
|
|
@ -329,9 +329,11 @@ public class MetadataCreateIndexService {
|
|||
final Boolean isHiddenFromRequest = IndexMetadata.INDEX_HIDDEN_SETTING.exists(request.settings()) ?
|
||||
IndexMetadata.INDEX_HIDDEN_SETTING.get(request.settings()) : null;
|
||||
|
||||
// The backing index may have a different name or prefix than the data stream name.
|
||||
final String name = request.dataStreamName() != null ? request.dataStreamName() : request.index();
|
||||
// Check to see if a v2 template matched
|
||||
final String v2Template = MetadataIndexTemplateService.findV2Template(currentState.metadata(),
|
||||
request.index(), isHiddenFromRequest == null ? false : isHiddenFromRequest);
|
||||
name, isHiddenFromRequest == null ? false : isHiddenFromRequest);
|
||||
|
||||
if (v2Template != null) {
|
||||
// If a v2 template was found, it takes precedence over all v1 templates, so create
|
||||
|
|
|
@ -326,8 +326,8 @@ public class MetadataRolloverServiceTests extends ESTestCase {
|
|||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build();
|
||||
rolloverRequest.getCreateIndexRequest().settings(settings);
|
||||
final CreateIndexClusterStateUpdateRequest createIndexRequest =
|
||||
MetadataRolloverService.prepareDataStreamCreateIndexRequest(newWriteIndexName, rolloverRequest.getCreateIndexRequest());
|
||||
final CreateIndexClusterStateUpdateRequest createIndexRequest = MetadataRolloverService.prepareDataStreamCreateIndexRequest(
|
||||
dataStream.getName(), newWriteIndexName, rolloverRequest.getCreateIndexRequest());
|
||||
for (String settingKey : settings.keySet()) {
|
||||
assertThat(settings.get(settingKey), equalTo(createIndexRequest.settings().get(settingKey)));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue