[7.x] Allow transactional metadata update with index creation (#55308)
This commit is contained in:
parent
643ecf68b5
commit
e89c5d6850
|
@ -91,6 +91,7 @@ import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
@ -301,8 +302,8 @@ public class MetadataCreateIndexService {
|
||||||
* Handles the cluster state transition to a version that reflects the {@link CreateIndexClusterStateUpdateRequest}.
|
* Handles the cluster state transition to a version that reflects the {@link CreateIndexClusterStateUpdateRequest}.
|
||||||
* All the requested changes are firstly validated before mutating the {@link ClusterState}.
|
* All the requested changes are firstly validated before mutating the {@link ClusterState}.
|
||||||
*/
|
*/
|
||||||
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request,
|
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request, boolean silent,
|
||||||
boolean silent) throws Exception {
|
BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) throws Exception {
|
||||||
logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());
|
logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());
|
||||||
|
|
||||||
validate(request, currentState);
|
validate(request, currentState);
|
||||||
|
@ -313,7 +314,7 @@ public class MetadataCreateIndexService {
|
||||||
if (sourceMetadata != null) {
|
if (sourceMetadata != null) {
|
||||||
// If source metadata was provided, it means we're recovering from an existing index,
|
// If source metadata was provided, it means we're recovering from an existing index,
|
||||||
// in which case templates don't apply, so create the index from the source metadata
|
// in which case templates don't apply, so create the index from the source metadata
|
||||||
return applyCreateIndexRequestWithExistingMetadata(currentState, request, silent, sourceMetadata);
|
return applyCreateIndexRequestWithExistingMetadata(currentState, request, silent, sourceMetadata, metadataTransformer);
|
||||||
} else {
|
} else {
|
||||||
// Hidden indices apply templates slightly differently (ignoring wildcard '*'
|
// Hidden indices apply templates slightly differently (ignoring wildcard '*'
|
||||||
// templates), so we need to check to see if the request is creating a hidden index
|
// templates), so we need to check to see if the request is creating a hidden index
|
||||||
|
@ -328,18 +329,23 @@ public class MetadataCreateIndexService {
|
||||||
if (v2Template != null) {
|
if (v2Template != null) {
|
||||||
// If a v2 template was found, it takes precedence over all v1 templates, so create
|
// If a v2 template was found, it takes precedence over all v1 templates, so create
|
||||||
// the index using that template and the request's specified settings
|
// the index using that template and the request's specified settings
|
||||||
return applyCreateIndexRequestWithV2Template(currentState, request, silent, v2Template);
|
return applyCreateIndexRequestWithV2Template(currentState, request, silent, v2Template, metadataTransformer);
|
||||||
} else {
|
} else {
|
||||||
// A v2 template wasn't found, check the v1 templates, in the event no templates are
|
// A v2 template wasn't found, check the v1 templates, in the event no templates are
|
||||||
// found creation still works using the request's specified index settings
|
// found creation still works using the request's specified index settings
|
||||||
final List<IndexTemplateMetadata> v1Templates = MetadataIndexTemplateService.findV1Templates(currentState.metadata(),
|
final List<IndexTemplateMetadata> v1Templates = MetadataIndexTemplateService.findV1Templates(currentState.metadata(),
|
||||||
request.index(), isHiddenFromRequest);
|
request.index(), isHiddenFromRequest);
|
||||||
|
|
||||||
return applyCreateIndexRequestWithV1Templates(currentState, request, silent, v1Templates);
|
return applyCreateIndexRequestWithV1Templates(currentState, request, silent, v1Templates, metadataTransformer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request,
|
||||||
|
boolean silent) throws Exception {
|
||||||
|
return applyCreateIndexRequest(currentState, request, silent, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Given the state and a request as well as the metadata necessary to build a new index,
|
* Given the state and a request as well as the metadata necessary to build a new index,
|
||||||
* validate the configuration with an actual index service as return a new cluster state with
|
* validate the configuration with an actual index service as return a new cluster state with
|
||||||
|
@ -352,6 +358,8 @@ public class MetadataCreateIndexService {
|
||||||
* @param mappings a map of mappings for the new index
|
* @param mappings a map of mappings for the new index
|
||||||
* @param aliasSupplier a function that takes the real {@link IndexService} and returns a list of {@link AliasMetadata} aliases
|
* @param aliasSupplier a function that takes the real {@link IndexService} and returns a list of {@link AliasMetadata} aliases
|
||||||
* @param templatesApplied a list of the names of the templates applied, for logging
|
* @param templatesApplied a list of the names of the templates applied, for logging
|
||||||
|
* @param metadataTransformer if provided, a function that may alter cluster metadata in the same cluster state update that
|
||||||
|
* creates the index
|
||||||
* @return a new cluster state with the index added
|
* @return a new cluster state with the index added
|
||||||
*/
|
*/
|
||||||
private ClusterState applyCreateIndexWithTemporaryService(final ClusterState currentState,
|
private ClusterState applyCreateIndexWithTemporaryService(final ClusterState currentState,
|
||||||
|
@ -361,7 +369,9 @@ public class MetadataCreateIndexService {
|
||||||
final IndexMetadata temporaryIndexMeta,
|
final IndexMetadata temporaryIndexMeta,
|
||||||
final Map<String, Map<String, Object>> mappings,
|
final Map<String, Map<String, Object>> mappings,
|
||||||
final Function<IndexService, List<AliasMetadata>> aliasSupplier,
|
final Function<IndexService, List<AliasMetadata>> aliasSupplier,
|
||||||
final List<String> templatesApplied) throws Exception {
|
final List<String> templatesApplied,
|
||||||
|
final BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer)
|
||||||
|
throws Exception {
|
||||||
// create the index here (on the master) to validate it can be created, as well as adding the mapping
|
// create the index here (on the master) to validate it can be created, as well as adding the mapping
|
||||||
return indicesService.<ClusterState, Exception>withTempIndexService(temporaryIndexMeta, indexService -> {
|
return indicesService.<ClusterState, Exception>withTempIndexService(temporaryIndexMeta, indexService -> {
|
||||||
try {
|
try {
|
||||||
|
@ -389,7 +399,7 @@ public class MetadataCreateIndexService {
|
||||||
|
|
||||||
indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetadata.getIndex(),
|
indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetadata.getIndex(),
|
||||||
indexMetadata.getSettings());
|
indexMetadata.getSettings());
|
||||||
return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, allocationService::reroute);
|
return clusterStateCreateIndex(currentState, request.blocks(), indexMetadata, allocationService::reroute, metadataTransformer);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -424,7 +434,9 @@ public class MetadataCreateIndexService {
|
||||||
private ClusterState applyCreateIndexRequestWithV1Templates(final ClusterState currentState,
|
private ClusterState applyCreateIndexRequestWithV1Templates(final ClusterState currentState,
|
||||||
final CreateIndexClusterStateUpdateRequest request,
|
final CreateIndexClusterStateUpdateRequest request,
|
||||||
final boolean silent,
|
final boolean silent,
|
||||||
final List<IndexTemplateMetadata> templates) throws Exception {
|
final List<IndexTemplateMetadata> templates,
|
||||||
|
final BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer)
|
||||||
|
throws Exception {
|
||||||
logger.info("applying create index request using v1 templates {}", templates);
|
logger.info("applying create index request using v1 templates {}", templates);
|
||||||
|
|
||||||
final Map<String, Map<String, Object>> mappings = Collections.unmodifiableMap(parseMappings(request.mappings(),
|
final Map<String, Map<String, Object>> mappings = Collections.unmodifiableMap(parseMappings(request.mappings(),
|
||||||
|
@ -452,13 +464,15 @@ public class MetadataCreateIndexService {
|
||||||
// the context is only used for validation so it's fine to pass fake values for the
|
// the context is only used for validation so it's fine to pass fake values for the
|
||||||
// shard id and the current timestamp
|
// shard id and the current timestamp
|
||||||
xContentRegistry, indexService.newQueryShardContext(0, null, () -> 0L, null)),
|
xContentRegistry, indexService.newQueryShardContext(0, null, () -> 0L, null)),
|
||||||
templates.stream().map(IndexTemplateMetadata::getName).collect(toList()));
|
templates.stream().map(IndexTemplateMetadata::getName).collect(toList()), metadataTransformer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState currentState,
|
private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState currentState,
|
||||||
final CreateIndexClusterStateUpdateRequest request,
|
final CreateIndexClusterStateUpdateRequest request,
|
||||||
final boolean silent,
|
final boolean silent,
|
||||||
final String templateName) throws Exception {
|
final String templateName,
|
||||||
|
final BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer)
|
||||||
|
throws Exception {
|
||||||
logger.info("applying create index request using v2 template [{}]", templateName);
|
logger.info("applying create index request using v2 template [{}]", templateName);
|
||||||
|
|
||||||
final Map<String, Map<String, Object>> mappings = Collections.unmodifiableMap(parseMappings(request.mappings(),
|
final Map<String, Map<String, Object>> mappings = Collections.unmodifiableMap(parseMappings(request.mappings(),
|
||||||
|
@ -480,13 +494,15 @@ public class MetadataCreateIndexService {
|
||||||
// the context is only used for validation so it's fine to pass fake values for the
|
// the context is only used for validation so it's fine to pass fake values for the
|
||||||
// shard id and the current timestamp
|
// shard id and the current timestamp
|
||||||
xContentRegistry, indexService.newQueryShardContext(0, null, () -> 0L, null)),
|
xContentRegistry, indexService.newQueryShardContext(0, null, () -> 0L, null)),
|
||||||
Collections.singletonList(templateName));
|
Collections.singletonList(templateName), metadataTransformer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterState applyCreateIndexRequestWithExistingMetadata(final ClusterState currentState,
|
private ClusterState applyCreateIndexRequestWithExistingMetadata(final ClusterState currentState,
|
||||||
final CreateIndexClusterStateUpdateRequest request,
|
final CreateIndexClusterStateUpdateRequest request,
|
||||||
final boolean silent,
|
final boolean silent,
|
||||||
final IndexMetadata sourceMetadata) throws Exception {
|
final IndexMetadata sourceMetadata,
|
||||||
|
final BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer)
|
||||||
|
throws Exception {
|
||||||
logger.info("applying create index request using existing index [{}] metadata", sourceMetadata.getIndex().getName());
|
logger.info("applying create index request using existing index [{}] metadata", sourceMetadata.getIndex().getName());
|
||||||
|
|
||||||
final Map<String, Map<String, Object>> mappings;
|
final Map<String, Map<String, Object>> mappings;
|
||||||
|
@ -510,7 +526,7 @@ public class MetadataCreateIndexService {
|
||||||
// the context is only used for validation so it's fine to pass fake values for the
|
// the context is only used for validation so it's fine to pass fake values for the
|
||||||
// shard id and the current timestamp
|
// shard id and the current timestamp
|
||||||
indexService.newQueryShardContext(0, null, () -> 0L, null)),
|
indexService.newQueryShardContext(0, null, () -> 0L, null)),
|
||||||
Collections.emptyList());
|
org.elasticsearch.common.collect.List.of(), metadataTransformer);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -746,10 +762,14 @@ public class MetadataCreateIndexService {
|
||||||
* table based on the live nodes.
|
* table based on the live nodes.
|
||||||
*/
|
*/
|
||||||
static ClusterState clusterStateCreateIndex(ClusterState currentState, Set<ClusterBlock> clusterBlocks, IndexMetadata indexMetadata,
|
static ClusterState clusterStateCreateIndex(ClusterState currentState, Set<ClusterBlock> clusterBlocks, IndexMetadata indexMetadata,
|
||||||
BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable) {
|
BiFunction<ClusterState, String, ClusterState> rerouteRoutingTable,
|
||||||
Metadata newMetadata = Metadata.builder(currentState.metadata())
|
BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer) {
|
||||||
.put(indexMetadata, false)
|
Metadata.Builder builder = Metadata.builder(currentState.metadata())
|
||||||
.build();
|
.put(indexMetadata, false);
|
||||||
|
if (metadataTransformer != null) {
|
||||||
|
metadataTransformer.accept(builder, indexMetadata);
|
||||||
|
}
|
||||||
|
Metadata newMetadata = builder.build();
|
||||||
|
|
||||||
String indexName = indexMetadata.getIndex().getName();
|
String indexName = indexMetadata.getIndex().getName();
|
||||||
ClusterBlocks.Builder blocks = createClusterBlocksBuilder(currentState, indexName, clusterBlocks);
|
ClusterBlocks.Builder blocks = createClusterBlocksBuilder(currentState, indexName, clusterBlocks);
|
||||||
|
|
|
@ -82,6 +82,7 @@ import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -89,7 +90,6 @@ import java.util.stream.Stream;
|
||||||
|
|
||||||
import static java.util.Collections.emptyList;
|
import static java.util.Collections.emptyList;
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
import static java.util.Collections.emptySet;
|
|
||||||
import static java.util.Collections.singleton;
|
import static java.util.Collections.singleton;
|
||||||
import static java.util.Collections.singletonList;
|
import static java.util.Collections.singletonList;
|
||||||
import static java.util.Collections.singletonMap;
|
import static java.util.Collections.singletonMap;
|
||||||
|
@ -807,7 +807,8 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
||||||
|
|
||||||
assertThat(
|
assertThat(
|
||||||
expectThrows(IllegalStateException.class,
|
expectThrows(IllegalStateException.class,
|
||||||
() -> clusterStateCreateIndex(currentClusterState, emptySet(), newIndex, (state, reason) -> state)).getMessage(),
|
() -> clusterStateCreateIndex(currentClusterState, org.elasticsearch.common.collect.Set.of(), newIndex,
|
||||||
|
(state, reason) -> state, null)).getMessage(),
|
||||||
startsWith("alias [alias1] has more than one write index [")
|
startsWith("alias [alias1] has more than one write index [")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -830,13 +831,44 @@ public class MetadataCreateIndexServiceTests extends ESTestCase {
|
||||||
return clusterState;
|
return clusterState;
|
||||||
};
|
};
|
||||||
|
|
||||||
ClusterState updatedClusterState = clusterStateCreateIndex(currentClusterState, singleton(INDEX_READ_ONLY_BLOCK), newIndexMetadata,
|
ClusterState updatedClusterState = clusterStateCreateIndex(currentClusterState,
|
||||||
rerouteRoutingTable);
|
org.elasticsearch.common.collect.Set.of(INDEX_READ_ONLY_BLOCK), newIndexMetadata, rerouteRoutingTable, null);
|
||||||
assertThat(updatedClusterState.blocks().getIndexBlockWithId("test", INDEX_READ_ONLY_BLOCK.id()), is(INDEX_READ_ONLY_BLOCK));
|
assertThat(updatedClusterState.blocks().getIndexBlockWithId("test", INDEX_READ_ONLY_BLOCK.id()), is(INDEX_READ_ONLY_BLOCK));
|
||||||
assertThat(updatedClusterState.routingTable().index("test"), is(notNullValue()));
|
assertThat(updatedClusterState.routingTable().index("test"), is(notNullValue()));
|
||||||
assertThat(allocationRerouted.get(), is(true));
|
assertThat(allocationRerouted.get(), is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testClusterStateCreateIndexWithMetadataTransaction() {
|
||||||
|
ClusterState currentClusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
|
||||||
|
.metadata(Metadata.builder()
|
||||||
|
.put(IndexMetadata.builder("my-index")
|
||||||
|
.settings(settings(Version.CURRENT).put(SETTING_READ_ONLY, true))
|
||||||
|
.numberOfShards(1)
|
||||||
|
.numberOfReplicas(0)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
IndexMetadata newIndexMetadata = IndexMetadata.builder("test")
|
||||||
|
.settings(settings(Version.CURRENT).put(SETTING_READ_ONLY, true))
|
||||||
|
.numberOfShards(1)
|
||||||
|
.numberOfReplicas(0)
|
||||||
|
.putAlias(AliasMetadata.builder("alias1").writeIndex(true).build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// adds alias from new index to existing index
|
||||||
|
BiConsumer<Metadata.Builder, IndexMetadata> metadataTransformer = (builder, indexMetadata) -> {
|
||||||
|
AliasMetadata newAlias = indexMetadata.getAliases().iterator().next().value;
|
||||||
|
IndexMetadata myIndex = builder.get("my-index");
|
||||||
|
builder.put(IndexMetadata.builder(myIndex).putAlias(AliasMetadata.builder(newAlias.getAlias()).build()));
|
||||||
|
};
|
||||||
|
|
||||||
|
ClusterState updatedClusterState = clusterStateCreateIndex(currentClusterState, org.elasticsearch.common.collect.Set.of(
|
||||||
|
INDEX_READ_ONLY_BLOCK), newIndexMetadata, (clusterState, y) -> clusterState, metadataTransformer);
|
||||||
|
assertTrue(updatedClusterState.metadata().findAllAliases(new String[]{"my-index"}).containsKey("my-index"));
|
||||||
|
assertNotNull(updatedClusterState.metadata().findAllAliases(new String[]{"my-index"}).get("my-index"));
|
||||||
|
assertNotNull(updatedClusterState.metadata().findAllAliases(new String[]{"my-index"}).get("my-index").get(0).alias(),
|
||||||
|
equalTo("alias1"));
|
||||||
|
}
|
||||||
|
|
||||||
public void testParseMappingsWithTypedTemplateAndTypelessIndexMapping() throws Exception {
|
public void testParseMappingsWithTypedTemplateAndTypelessIndexMapping() throws Exception {
|
||||||
IndexTemplateMetadata templateMetadata = addMatchingTemplate(builder -> {
|
IndexTemplateMetadata templateMetadata = addMatchingTemplate(builder -> {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue