Rollover: refactor out cluster state update (#53965) (#54269)

Make it possible to reuse the cluster state update of rollover for
simulation purposes by extracting it. Also now run the full rollover in
the pre-rollover phase and the actual rollover phase, allowing a
dedicated exception in case of concurrent rollovers as well as a more
thorough pre-check.
This commit is contained in:
Henning Andersen 2020-03-26 16:06:13 +01:00 committed by GitHub
parent 27cd5b343c
commit 5bfaa20dd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 673 additions and 417 deletions

View File

@ -0,0 +1,191 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;
import static org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService.findTemplates;
public class MetaDataRolloverService {
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$");
private final ThreadPool threadPool;
private final MetaDataCreateIndexService createIndexService;
private final MetaDataIndexAliasesService indexAliasesService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
public MetaDataRolloverService(ThreadPool threadPool,
MetaDataCreateIndexService createIndexService, MetaDataIndexAliasesService indexAliasesService,
IndexNameExpressionResolver indexNameExpressionResolver) {
this.threadPool = threadPool;
this.createIndexService = createIndexService;
this.indexAliasesService = indexAliasesService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
public static class RolloverResult {
public final String rolloverIndexName;
public final String sourceIndexName;
public final ClusterState clusterState;
private RolloverResult(String rolloverIndexName, String sourceIndexName, ClusterState clusterState) {
this.rolloverIndexName = rolloverIndexName;
this.sourceIndexName = sourceIndexName;
this.clusterState = clusterState;
}
}
public RolloverResult rolloverClusterState(ClusterState currentState, String aliasName, String newIndexName,
CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
final MetaData metaData = currentState.metaData();
validate(metaData, aliasName);
final AliasOrIndex.Alias alias = (AliasOrIndex.Alias) metaData.getAliasAndIndexLookup().get(aliasName);
final IndexMetaData indexMetaData = alias.getWriteIndex();
final AliasMetaData aliasMetaData = indexMetaData.getAliases().get(alias.getAliasName());
final String sourceProvidedName = indexMetaData.getSettings().get(IndexMetaData.SETTING_INDEX_PROVIDED_NAME,
indexMetaData.getIndex().getName());
final String sourceIndexName = indexMetaData.getIndex().getName();
final String unresolvedName = (newIndexName != null)
? newIndexName
: generateRolloverIndexName(sourceProvidedName, indexNameExpressionResolver);
final String rolloverIndexName = indexNameExpressionResolver.resolveDateMathExpression(unresolvedName);
final boolean explicitWriteIndex = Boolean.TRUE.equals(aliasMetaData.writeIndex());
final Boolean isHidden = IndexMetaData.INDEX_HIDDEN_SETTING.exists(createIndexRequest.settings()) ?
IndexMetaData.INDEX_HIDDEN_SETTING.get(createIndexRequest.settings()) : null;
createIndexService.validateIndexName(rolloverIndexName, currentState); // fails if the index already exists
checkNoDuplicatedAliasInIndexTemplate(metaData, rolloverIndexName, aliasName, isHidden);
CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest =
prepareCreateIndexRequest(unresolvedName, rolloverIndexName, createIndexRequest);
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexClusterStateRequest, silent);
newState = indexAliasesService.applyAliasActions(newState,
rolloverAliasToNewIndex(sourceIndexName, rolloverIndexName, explicitWriteIndex, aliasMetaData.isHidden(), aliasName));
RolloverInfo rolloverInfo = new RolloverInfo(aliasName, metConditions, threadPool.absoluteTimeInMillis());
newState = ClusterState.builder(newState)
.metaData(MetaData.builder(newState.metaData())
.put(IndexMetaData.builder(newState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
return new RolloverResult(rolloverIndexName, sourceIndexName, newState);
}
static String generateRolloverIndexName(String sourceIndexName, IndexNameExpressionResolver indexNameExpressionResolver) {
String resolvedName = indexNameExpressionResolver.resolveDateMathExpression(sourceIndexName);
final boolean isDateMath = sourceIndexName.equals(resolvedName) == false;
if (INDEX_NAME_PATTERN.matcher(resolvedName).matches()) {
int numberIndex = sourceIndexName.lastIndexOf("-");
assert numberIndex != -1 : "no separator '-' found";
int counter = Integer.parseInt(sourceIndexName.substring(numberIndex + 1,
isDateMath ? sourceIndexName.length()-1 : sourceIndexName.length()));
String newName = sourceIndexName.substring(0, numberIndex) + "-" + String.format(Locale.ROOT, "%06d", ++counter)
+ (isDateMath ? ">" : "");
return newName;
} else {
throw new IllegalArgumentException("index name [" + sourceIndexName + "] does not match pattern '^.*-\\d+$'");
}
}
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final String providedIndexName, final String targetIndexName,
CreateIndexRequest createIndexRequest) {
createIndexRequest.cause("rollover_index");
createIndexRequest.index(targetIndexName);
return new CreateIndexClusterStateUpdateRequest(
"rollover_index", targetIndexName, providedIndexName)
.ackTimeout(createIndexRequest.timeout())
.masterNodeTimeout(createIndexRequest.masterNodeTimeout())
.settings(createIndexRequest.settings())
.aliases(createIndexRequest.aliases())
.waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
.mappings(createIndexRequest.mappings());
}
/**
* Creates the alias actions to reflect the alias rollover from the old (source) index to the new (target/rolled over) index. An
* alias pointing to multiple indices will have to be an explicit write index (ie. the old index alias has is_write_index set to true)
* in which case, after the rollover, the new index will need to be the explicit write index.
*/
static List<AliasAction> rolloverAliasToNewIndex(String oldIndex, String newIndex, boolean explicitWriteIndex,
@Nullable Boolean isHidden, String alias) {
if (explicitWriteIndex) {
return Collections.unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, alias, null, null, null, true, isHidden),
new AliasAction.Add(oldIndex, alias, null, null, null, false, isHidden)));
} else {
return Collections.unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, alias, null, null, null, null, isHidden),
new AliasAction.Remove(oldIndex, alias)));
}
}
/**
* If the newly created index matches with an index template whose aliases contains the rollover alias,
* the rollover alias will point to multiple indices. This causes indexing requests to be rejected.
* To avoid this, we make sure that there is no duplicated alias in index templates before creating a new index.
*/
static void checkNoDuplicatedAliasInIndexTemplate(MetaData metaData, String rolloverIndexName, String rolloverRequestAlias,
@Nullable Boolean isHidden) {
final List<IndexTemplateMetaData> matchedTemplates = findTemplates(metaData, rolloverIndexName, isHidden);
for (IndexTemplateMetaData template : matchedTemplates) {
if (template.aliases().containsKey(rolloverRequestAlias)) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Rollover alias [%s] can point to multiple indices, found duplicated alias [%s] in index template [%s]",
rolloverRequestAlias, template.aliases().keys(), template.name()));
}
}
}
static void validate(MetaData metaData, String aliasName) {
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(aliasName);
if (aliasOrIndex == null) {
throw new IllegalArgumentException("source alias does not exist");
}
if (aliasOrIndex.isAlias() == false) {
throw new IllegalArgumentException("source alias is a concrete index");
}
final AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex;
if (alias.getWriteIndex() == null) {
throw new IllegalArgumentException("source alias [" + alias.getAliasName() + "] does not point to a write index");
}
}
}

View File

@ -19,14 +19,12 @@
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
@ -35,15 +33,9 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
@ -55,38 +47,29 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService.findTemplates;
/**
* Main class to swap the index pointed to by an alias, given some conditions
*/
public class TransportRolloverAction extends TransportMasterNodeAction<RolloverRequest, RolloverResponse> {
private static final Pattern INDEX_NAME_PATTERN = Pattern.compile("^.*-\\d+$");
private final MetaDataCreateIndexService createIndexService;
private final MetaDataIndexAliasesService indexAliasesService;
private final MetaDataRolloverService rolloverService;
private final ActiveShardsObserver activeShardsObserver;
private final Client client;
@Inject
public TransportRolloverAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetaDataCreateIndexService createIndexService,
public TransportRolloverAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetaDataIndexAliasesService indexAliasesService, Client client) {
MetaDataRolloverService rolloverService, Client client) {
super(RolloverAction.NAME, transportService, clusterService, threadPool, actionFilters, RolloverRequest::new,
indexNameExpressionResolver);
this.createIndexService = createIndexService;
this.indexAliasesService = indexAliasesService;
this.rolloverService = rolloverService;
this.client = client;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
}
@ -118,24 +101,14 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
@Override
protected void masterOperation(Task task, final RolloverRequest rolloverRequest, final ClusterState state,
final ActionListener<RolloverResponse> listener) {
final MetaData metaData = state.metaData();
validate(metaData, rolloverRequest);
final AliasOrIndex.Alias alias = (AliasOrIndex.Alias) metaData.getAliasAndIndexLookup().get(rolloverRequest.getAlias());
final IndexMetaData indexMetaData = alias.getWriteIndex();
final AliasMetaData aliasMetaData = indexMetaData.getAliases().get(alias.getAliasName());
final boolean explicitWriteIndex = Boolean.TRUE.equals(aliasMetaData.writeIndex());
final String sourceProvidedName = indexMetaData.getSettings().get(IndexMetaData.SETTING_INDEX_PROVIDED_NAME,
indexMetaData.getIndex().getName());
final String sourceIndexName = indexMetaData.getIndex().getName();
final String unresolvedName = (rolloverRequest.getNewIndexName() != null)
? rolloverRequest.getNewIndexName()
: generateRolloverIndexName(sourceProvidedName, indexNameExpressionResolver);
final String rolloverIndexName = indexNameExpressionResolver.resolveDateMathExpression(unresolvedName);
final Boolean isHidden = IndexMetaData.INDEX_HIDDEN_SETTING.exists(rolloverRequest.getCreateIndexRequest().settings()) ?
IndexMetaData.INDEX_HIDDEN_SETTING.get(rolloverRequest.getCreateIndexRequest().settings()) : null;
createIndexService.validateIndexName(rolloverIndexName, state); // fails if the index already exists
checkNoDuplicatedAliasInIndexTemplate(metaData, rolloverIndexName, rolloverRequest.getAlias(), isHidden);
final ActionListener<RolloverResponse> listener) throws Exception {
MetaDataRolloverService.RolloverResult preResult =
rolloverService.rolloverClusterState(state,
rolloverRequest.getAlias(), rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest(),
Collections.emptyList(), true);
MetaData metaData = state.metaData();
String sourceIndexName = preResult.sourceIndexName;
String rolloverIndexName = preResult.rolloverIndexName;
IndicesStatsRequest statsRequest = new IndicesStatsRequest().indices(rolloverRequest.getAlias())
.clear()
.indicesOptions(IndicesOptions.fromOptions(true, false, true, true))
@ -156,22 +129,18 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
.filter(condition -> conditionResults.get(condition.toString())).collect(Collectors.toList());
if (conditionResults.size() == 0 || metConditions.size() > 0) {
CreateIndexClusterStateUpdateRequest createIndexRequest = prepareCreateIndexRequest(unresolvedName,
rolloverIndexName, rolloverRequest);
clusterService.submitStateUpdateTask("rollover_index source [" + sourceIndexName + "] to target ["
+ rolloverIndexName + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexRequest);
newState = indexAliasesService.applyAliasActions(newState,
rolloverAliasToNewIndex(sourceIndexName, rolloverIndexName, rolloverRequest, explicitWriteIndex,
aliasMetaData.isHidden()));
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(newState)
.metaData(MetaData.builder(newState.metaData())
.put(IndexMetaData.builder(newState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
MetaDataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(currentState,
rolloverRequest.getAlias(), rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest(),
metConditions, false);
if (rolloverResult.sourceIndexName.equals(sourceIndexName) == false) {
throw new ElasticsearchException("Concurrent modification of alias [{}] during rollover",
rolloverRequest.getAlias());
}
return rolloverResult.clusterState;
}
@Override
@ -208,40 +177,6 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
);
}
/**
* Creates the alias actions to reflect the alias rollover from the old (source) index to the new (target/rolled over) index. An
* alias pointing to multiple indices will have to be an explicit write index (ie. the old index alias has is_write_index set to true)
* in which case, after the rollover, the new index will need to be the explicit write index.
*/
static List<AliasAction> rolloverAliasToNewIndex(String oldIndex, String newIndex, RolloverRequest request, boolean explicitWriteIndex,
@Nullable Boolean isHidden) {
if (explicitWriteIndex) {
return unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, true, isHidden),
new AliasAction.Add(oldIndex, request.getAlias(), null, null, null, false, isHidden)));
} else {
return unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null, isHidden),
new AliasAction.Remove(oldIndex, request.getAlias())));
}
}
static String generateRolloverIndexName(String sourceIndexName, IndexNameExpressionResolver indexNameExpressionResolver) {
String resolvedName = indexNameExpressionResolver.resolveDateMathExpression(sourceIndexName);
final boolean isDateMath = sourceIndexName.equals(resolvedName) == false;
if (INDEX_NAME_PATTERN.matcher(resolvedName).matches()) {
int numberIndex = sourceIndexName.lastIndexOf("-");
assert numberIndex != -1 : "no separator '-' found";
int counter = Integer.parseInt(sourceIndexName.substring(numberIndex + 1, isDateMath ? sourceIndexName.length()-1 :
sourceIndexName.length()));
String newName = sourceIndexName.substring(0, numberIndex) + "-" + String.format(Locale.ROOT, "%06d", ++counter)
+ (isDateMath ? ">" : "");
return newName;
} else {
throw new IllegalArgumentException("index name [" + sourceIndexName + "] does not match pattern '^.*-\\d+$'");
}
}
static Map<String, Boolean> evaluateConditions(final Collection<Condition<?>> conditions,
@Nullable final DocsStats docsStats,
@Nullable final IndexMetaData metaData) {
@ -269,51 +204,4 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
return evaluateConditions(conditions, docsStats, metaData);
}
}
static void validate(MetaData metaData, RolloverRequest request) {
final AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(request.getAlias());
if (aliasOrIndex == null) {
throw new IllegalArgumentException("source alias does not exist");
}
if (aliasOrIndex.isAlias() == false) {
throw new IllegalArgumentException("source alias is a concrete index");
}
final AliasOrIndex.Alias alias = (AliasOrIndex.Alias) aliasOrIndex;
if (alias.getWriteIndex() == null) {
throw new IllegalArgumentException("source alias [" + alias.getAliasName() + "] does not point to a write index");
}
}
static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest(final String providedIndexName, final String targetIndexName,
final RolloverRequest rolloverRequest) {
final CreateIndexRequest createIndexRequest = rolloverRequest.getCreateIndexRequest();
createIndexRequest.cause("rollover_index");
createIndexRequest.index(targetIndexName);
return new CreateIndexClusterStateUpdateRequest(
"rollover_index", targetIndexName, providedIndexName)
.ackTimeout(createIndexRequest.timeout())
.masterNodeTimeout(createIndexRequest.masterNodeTimeout())
.settings(createIndexRequest.settings())
.aliases(createIndexRequest.aliases())
.waitForActiveShards(ActiveShardCount.NONE) // not waiting for shards here, will wait on the alias switch operation
.mappings(createIndexRequest.mappings());
}
/**
* If the newly created index matches with an index template whose aliases contains the rollover alias,
* the rollover alias will point to multiple indices. This causes indexing requests to be rejected.
* To avoid this, we make sure that there is no duplicated alias in index templates before creating a new index.
*/
static void checkNoDuplicatedAliasInIndexTemplate(MetaData metaData, String rolloverIndexName, String rolloverRequestAlias,
@Nullable Boolean isHidden) {
final List<IndexTemplateMetaData> matchedTemplates = findTemplates(metaData, rolloverIndexName, isHidden);
for (IndexTemplateMetaData template : matchedTemplates) {
if (template.aliases().containsKey(rolloverRequestAlias)) {
throw new IllegalArgumentException(String.format(Locale.ROOT,
"Rollover alias [%s] can point to multiple indices, found duplicated alias [%s] in index template [%s]",
rolloverRequestAlias, template.aliases().keys(), template.name()));
}
}
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
@ -73,7 +74,6 @@ import org.elasticsearch.indices.IndexCreationException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -281,7 +281,7 @@ public class MetaDataCreateIndexService {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return applyCreateIndexRequest(currentState, request);
return applyCreateIndexRequest(currentState, request, false);
}
@Override
@ -300,11 +300,9 @@ public class MetaDataCreateIndexService {
* Handles the cluster state transition to a version that reflects the {@link CreateIndexClusterStateUpdateRequest}.
* All the requested changes are firstly validated before mutating the {@link ClusterState}.
*/
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request) throws Exception {
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request,
boolean silent) throws Exception {
logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());
Index createdIndex = null;
String removalExtraInfo = null;
IndexRemovalReason removalReason = IndexRemovalReason.FAILURE;
validate(request, currentState);
@ -336,16 +334,19 @@ public class MetaDataCreateIndexService {
settingsBuilder.remove(IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.getKey());
final Settings indexSettings = settingsBuilder.build();
try {
final IndexService indexService = validateActiveShardCountAndCreateIndexService(request.index(), request.waitForActiveShards(),
indexSettings, routingNumShards, indicesService);
// create the index here (on the master) to validate it can be created, as well as adding the mapping
createdIndex = indexService.index();
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());
tmpImdBuilder.setRoutingNumShards(routingNumShards);
tmpImdBuilder.settings(indexSettings);
// Set up everything, now locally create the index to see that things are ok, and apply
IndexMetaData tmpImd = tmpImdBuilder.build();
validateActiveShardCount(request.waitForActiveShards(), tmpImd);
// create the index here (on the master) to validate it can be created, as well as adding the mapping
return indicesService.<ClusterState, Exception>withTempIndexService(tmpImd, indexService -> {
try {
updateIndexMappingsAndBuildSortOrder(indexService, mappings, sourceMetaData);
} catch (Exception e) {
removalExtraInfo = "failed on parsing mappings on index creation";
logger.debug("failed on parsing mappings on index creation [{}]", request.index());
throw e;
}
@ -362,28 +363,18 @@ public class MetaDataCreateIndexService {
indexMetaData = buildIndexMetaData(request.index(), aliases, mapperService::documentMapper,
() -> mapperService.documentMapper(MapperService.DEFAULT_MAPPING), indexSettings, routingNumShards, sourceMetaData);
} catch (Exception e) {
removalExtraInfo = "failed to build index metadata";
logger.info("failed to build index metadata [{}]", request.index());
throw e;
}
logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}",
logger.log(silent ? Level.DEBUG : Level.INFO, "[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}",
request.index(), request.cause(), templates.stream().map(IndexTemplateMetaData::getName).collect(toList()),
indexMetaData.getNumberOfShards(), indexMetaData.getNumberOfReplicas(), mappings.keySet());
indexService.getIndexEventListener().beforeIndexAddedToCluster(indexMetaData.getIndex(),
indexMetaData.getSettings());
final ClusterState updatedState = clusterStateCreateIndex(currentState, request.blocks(), indexMetaData,
allocationService::reroute);
removalExtraInfo = "cleaning up after validating index on master";
removalReason = IndexRemovalReason.NO_LONGER_ASSIGNED;
return updatedState;
} finally {
if (createdIndex != null) {
// Index was already partially created - need to clean up
indicesService.removeIndex(createdIndex, removalReason, removalExtraInfo);
}
}
return clusterStateCreateIndex(currentState, request.blocks(), indexMetaData, allocationService::reroute);
});
}
/**
@ -718,24 +709,15 @@ public class MetaDataCreateIndexService {
}
}
private static IndexService validateActiveShardCountAndCreateIndexService(String indexName, ActiveShardCount waitForActiveShards,
Settings indexSettings, int routingNumShards,
IndicesService indicesService) throws IOException {
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(indexName);
tmpImdBuilder.setRoutingNumShards(routingNumShards);
tmpImdBuilder.settings(indexSettings);
// Set up everything, now locally create the index to see that things are ok, and apply
IndexMetaData tmpImd = tmpImdBuilder.build();
private static void validateActiveShardCount(ActiveShardCount waitForActiveShards, IndexMetaData indexMetaData) {
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
waitForActiveShards = tmpImd.getWaitForActiveShards();
waitForActiveShards = indexMetaData.getWaitForActiveShards();
}
if (waitForActiveShards.validate(tmpImd.getNumberOfReplicas()) == false) {
if (waitForActiveShards.validate(indexMetaData.getNumberOfReplicas()) == false) {
throw new IllegalArgumentException("invalid wait_for_active_shards[" + waitForActiveShards +
"]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]");
(indexMetaData.getNumberOfReplicas() + 1) + "]");
}
return indicesService.createIndex(tmpImd, Collections.emptyList(), false);
}
private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) {

View File

@ -139,6 +139,7 @@ import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -574,6 +575,41 @@ public class IndicesService extends AbstractLifecycleComponent
}
}
public <T, E extends Exception> T withTempIndexService(final IndexMetaData indexMetaData,
CheckedFunction<IndexService, T, E> indexServiceConsumer) throws IOException, E {
final Index index = indexMetaData.getIndex();
if (hasIndex(index)) {
throw new ResourceAlreadyExistsException(index);
}
List<IndexEventListener> finalListeners = Collections.singletonList(
// double check that shard is not created.
new IndexEventListener() {
@Override
public void beforeIndexShardCreated(ShardId shardId, Settings indexSettings) {
assert false : "temp index should not trigger shard creation";
throw new ElasticsearchException("temp index should not trigger shard creation [{}]", index);
}
@Override
public void onStoreCreated(ShardId shardId) {
assert false : "temp index should not trigger store creation";
throw new ElasticsearchException("temp index should not trigger store creation [{}]", index);
}
}
);
final IndexService indexService =
createIndexService(
CREATE_INDEX,
indexMetaData,
indicesQueryCache,
indicesFieldDataCache,
finalListeners,
indexingMemoryController);
try (Closeable dummy = () -> indexService.close("temp", false)) {
return indexServiceConsumer.apply(indexService);
}
}
/**
* This creates a new IndexService without registering it
*/

View File

@ -0,0 +1,390 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MetaDataRolloverServiceTests extends ESTestCase {
public void testRolloverAliasActions() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
List<AliasAction> actions = MetaDataRolloverService.rolloverAliasToNewIndex(sourceIndex, targetIndex, false, null, sourceAlias);
assertThat(actions, hasSize(2));
boolean foundAdd = false;
boolean foundRemove = false;
for (AliasAction action : actions) {
if (action.getIndex().equals(targetIndex)) {
assertEquals(sourceAlias, ((AliasAction.Add) action).getAlias());
foundAdd = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertEquals(sourceAlias, ((AliasAction.Remove) action).getAlias());
foundRemove = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAdd);
assertTrue(foundRemove);
}
public void testRolloverAliasActionsWithExplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
List<AliasAction> actions = MetaDataRolloverService.rolloverAliasToNewIndex(sourceIndex, targetIndex, true, null, sourceAlias);
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
for (AliasAction action : actions) {
AliasAction.Add addAction = (AliasAction.Add) action;
if (action.getIndex().equals(targetIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertTrue(addAction.writeIndex());
foundAddWrite = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertFalse(addAction.writeIndex());
foundRemoveWrite = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAddWrite);
assertTrue(foundRemoveWrite);
}
public void testRolloverAliasActionsWithHiddenAliasAndExplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
List<AliasAction> actions = MetaDataRolloverService.rolloverAliasToNewIndex(sourceIndex, targetIndex, true, true, sourceAlias);
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
for (AliasAction action : actions) {
assertThat(action, instanceOf(AliasAction.Add.class));
AliasAction.Add addAction = (AliasAction.Add) action;
if (action.getIndex().equals(targetIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertTrue(addAction.writeIndex());
assertTrue(addAction.isHidden());
foundAddWrite = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertFalse(addAction.writeIndex());
assertTrue(addAction.isHidden());
foundRemoveWrite = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAddWrite);
assertTrue(foundRemoveWrite);
}
public void testRolloverAliasActionsWithHiddenAliasAndImplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
List<AliasAction> actions = MetaDataRolloverService.rolloverAliasToNewIndex(sourceIndex, targetIndex, false, true, sourceAlias);
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
for (AliasAction action : actions) {
if (action.getIndex().equals(targetIndex)) {
assertThat(action, instanceOf(AliasAction.Add.class));
AliasAction.Add addAction = (AliasAction.Add) action;
assertEquals(sourceAlias, addAction.getAlias());
assertThat(addAction.writeIndex(), nullValue());
assertTrue(addAction.isHidden());
foundAddWrite = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertThat(action, instanceOf(AliasAction.Remove.class));
AliasAction.Remove removeAction = (AliasAction.Remove) action;
assertEquals(sourceAlias, removeAction.getAlias());
foundRemoveWrite = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAddWrite);
assertTrue(foundRemoveWrite);
}
public void testValidation() {
String index1 = randomAlphaOfLength(10);
String aliasWithWriteIndex = randomAlphaOfLength(10);
String index2 = randomAlphaOfLength(10);
String aliasWithNoWriteIndex = randomAlphaOfLength(10);
Boolean firstIsWriteIndex = randomFrom(false, null);
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
MetaData.Builder metaDataBuilder = MetaData.builder()
.put(IndexMetaData.builder(index1)
.settings(settings)
.putAlias(AliasMetaData.builder(aliasWithWriteIndex))
.putAlias(AliasMetaData.builder(aliasWithNoWriteIndex).writeIndex(firstIsWriteIndex))
);
IndexMetaData.Builder indexTwoBuilder = IndexMetaData.builder(index2).settings(settings);
if (firstIsWriteIndex == null) {
indexTwoBuilder.putAlias(AliasMetaData.builder(aliasWithNoWriteIndex).writeIndex(randomFrom(false, null)));
}
metaDataBuilder.put(indexTwoBuilder);
MetaData metaData = metaDataBuilder.build();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () ->
MetaDataRolloverService.validate(metaData, aliasWithNoWriteIndex));
assertThat(exception.getMessage(), equalTo("source alias [" + aliasWithNoWriteIndex + "] does not point to a write index"));
exception = expectThrows(IllegalArgumentException.class, () ->
MetaDataRolloverService.validate(metaData, randomFrom(index1, index2)));
assertThat(exception.getMessage(), equalTo("source alias is a concrete index"));
exception = expectThrows(IllegalArgumentException.class, () ->
MetaDataRolloverService.validate(metaData, randomAlphaOfLength(5))
);
assertThat(exception.getMessage(), equalTo("source alias does not exist"));
MetaDataRolloverService.validate(metaData, aliasWithWriteIndex);
}
public void testGenerateRolloverIndexName() {
String invalidIndexName = randomAlphaOfLength(10) + "A";
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
expectThrows(IllegalArgumentException.class, () ->
MetaDataRolloverService.generateRolloverIndexName(invalidIndexName, indexNameExpressionResolver));
int num = randomIntBetween(0, 100);
final String indexPrefix = randomAlphaOfLength(10);
String indexEndingInNumbers = indexPrefix + "-" + num;
assertThat(MetaDataRolloverService.generateRolloverIndexName(indexEndingInNumbers, indexNameExpressionResolver),
equalTo(indexPrefix + "-" + String.format(Locale.ROOT, "%06d", num + 1)));
assertThat(MetaDataRolloverService.generateRolloverIndexName("index-name-1", indexNameExpressionResolver),
equalTo("index-name-000002"));
assertThat(MetaDataRolloverService.generateRolloverIndexName("index-name-2", indexNameExpressionResolver),
equalTo("index-name-000003"));
assertEquals( "<index-name-{now/d}-000002>", MetaDataRolloverService.generateRolloverIndexName("<index-name-{now/d}-1>",
indexNameExpressionResolver));
}
public void testCreateIndexRequest() {
String alias = randomAlphaOfLength(10);
String rolloverIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(alias, randomAlphaOfLength(10));
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
rolloverRequest.getCreateIndexRequest().waitForActiveShards(activeShardCount);
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
rolloverRequest.getCreateIndexRequest().settings(settings);
final CreateIndexClusterStateUpdateRequest createIndexRequest =
MetaDataRolloverService.prepareCreateIndexRequest(rolloverIndex, rolloverIndex, rolloverRequest.getCreateIndexRequest());
assertThat(createIndexRequest.settings(), equalTo(settings));
assertThat(createIndexRequest.index(), equalTo(rolloverIndex));
assertThat(createIndexRequest.cause(), equalTo("rollover_index"));
}
public void testRejectDuplicateAlias() {
final IndexTemplateMetaData template = IndexTemplateMetaData.builder("test-template")
.patterns(Arrays.asList("foo-*", "bar-*"))
.putAlias(AliasMetaData.builder("foo-write")).putAlias(AliasMetaData.builder("bar-write").writeIndex(randomBoolean()))
.build();
final MetaData metaData = MetaData.builder().put(createMetaData(randomAlphaOfLengthBetween(5, 7)), false).put(template).build();
String indexName = randomFrom("foo-123", "bar-xyz");
String aliasName = randomFrom("foo-write", "bar-write");
final IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> MetaDataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metaData, indexName, aliasName, randomBoolean()));
assertThat(ex.getMessage(), containsString("index template [test-template]"));
}
public void testHiddenAffectsResolvedTemplates() {
final IndexTemplateMetaData template = IndexTemplateMetaData.builder("test-template")
.patterns(Collections.singletonList("*"))
.putAlias(AliasMetaData.builder("foo-write")).putAlias(AliasMetaData.builder("bar-write").writeIndex(randomBoolean()))
.build();
final MetaData metaData = MetaData.builder().put(createMetaData(randomAlphaOfLengthBetween(5, 7)), false).put(template).build();
String indexName = randomFrom("foo-123", "bar-xyz");
String aliasName = randomFrom("foo-write", "bar-write");
// hidden shouldn't throw
MetaDataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metaData, indexName, aliasName, Boolean.TRUE);
// not hidden will throw
final IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
MetaDataRolloverService.checkNoDuplicatedAliasInIndexTemplate(metaData, indexName, aliasName, randomFrom(Boolean.FALSE, null)));
assertThat(ex.getMessage(), containsString("index template [test-template]"));
}
/**
* Test the main rolloverClusterState method. This does not validate every detail to depth, rather focuses on observing that each
* parameter is used for the purpose intended.
*/
public void testRolloverClusterState() throws Exception {
final String aliasName = "logs-alias";
final String indexPrefix = "logs-index-00000";
String sourceIndexName = indexPrefix + "1";
final IndexMetaData.Builder indexMetaData = IndexMetaData.builder(sourceIndexName)
.putAlias(AliasMetaData.builder(aliasName).writeIndex(true).build()).settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(1);
final ClusterState clusterState =
ClusterState.builder(new ClusterName("test")).metaData(MetaData.builder().put(indexMetaData)).build();
ThreadPool testThreadPool = new TestThreadPool(getTestName());
try {
ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
Environment env = mock(Environment.class);
when(env.sharedDataFile()).thenReturn(null);
AllocationService allocationService = mock(AllocationService.class);
when(allocationService.reroute(any(ClusterState.class), any(String.class))).then(i -> i.getArguments()[0]);
IndicesService indicesService = mockIndicesServices();
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());
MetaDataCreateIndexService createIndexService = new MetaDataCreateIndexService(Settings.EMPTY,
clusterService, indicesService, allocationService, null, env, null, testThreadPool, null, Collections.emptyList(), false);
MetaDataIndexAliasesService indexAliasesService = new MetaDataIndexAliasesService(clusterService, indicesService,
new AliasValidator(), null, xContentRegistry());
MetaDataRolloverService rolloverService = new MetaDataRolloverService(testThreadPool, createIndexService, indexAliasesService,
mockIndexNameExpressionResolver);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
String newIndexName = randomBoolean() ? "logs-index-9" : null;
int numberOfShards = randomIntBetween(1, 5);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
createIndexRequest.settings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards));
long before = testThreadPool.absoluteTimeInMillis();
MetaDataRolloverService.RolloverResult rolloverResult =
rolloverService.rolloverClusterState(clusterState,aliasName, newIndexName, createIndexRequest, metConditions,
randomBoolean());
long after = testThreadPool.absoluteTimeInMillis();
newIndexName = newIndexName == null ? indexPrefix + "2" : newIndexName;
assertEquals(sourceIndexName, rolloverResult.sourceIndexName);
assertEquals(newIndexName, rolloverResult.rolloverIndexName);
MetaData rolloverMetaData = rolloverResult.clusterState.metaData();
assertEquals(2, rolloverMetaData.indices().size());
IndexMetaData rolloverIndexMetaData = rolloverMetaData.index(newIndexName);
assertThat(rolloverIndexMetaData.getNumberOfShards(), equalTo(numberOfShards));
AliasOrIndex.Alias alias = (AliasOrIndex.Alias) rolloverMetaData.getAliasAndIndexLookup().get(aliasName);
assertThat(alias.getIndices(), hasSize(2));
assertThat(alias.getIndices(), hasItem(rolloverMetaData.index(sourceIndexName)));
assertThat(alias.getIndices(), hasItem(rolloverIndexMetaData));
assertThat(alias.getWriteIndex(), equalTo(rolloverIndexMetaData));
RolloverInfo info = rolloverMetaData.index(sourceIndexName).getRolloverInfos().get(aliasName);
assertThat(info.getTime(), lessThanOrEqualTo(after));
assertThat(info.getTime(), greaterThanOrEqualTo(before));
assertThat(info.getMetConditions(), hasSize(1));
assertThat(info.getMetConditions().get(0).value(), equalTo(condition.value()));
} finally {
testThreadPool.shutdown();
}
}
private IndicesService mockIndicesServices() throws java.io.IOException {
IndicesService indicesService = mock(IndicesService.class);
when(indicesService.withTempIndexService(any(IndexMetaData.class), any(CheckedFunction.class)))
.then(invocationOnMock -> {
IndexService indexService = mock(IndexService.class);
IndexMetaData indexMetaData = (IndexMetaData) invocationOnMock.getArguments()[0];
when(indexService.index()).thenReturn(indexMetaData.getIndex());
MapperService mapperService = mock(MapperService.class);
when(indexService.mapperService()).thenReturn(mapperService);
when(mapperService.documentMapper()).thenReturn(null);
when(indexService.getIndexEventListener()).thenReturn(new IndexEventListener() {});
when(indexService.getIndexSortSupplier()).thenReturn(() -> null);
//noinspection unchecked
return ((CheckedFunction) invocationOnMock.getArguments()[1]).apply(indexService);
});
return indicesService;
}
private static IndexMetaData createMetaData(String indexName) {
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
return IndexMetaData.builder(indexName)
.creationDate(System.currentTimeMillis() - TimeValue.timeValueHours(3).getMillis())
.settings(settings)
.build();
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.admin.indices.rollover;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
@ -30,16 +29,13 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsTests;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexAliasesService;
@ -78,23 +74,17 @@ import org.mockito.ArgumentCaptor;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.emptyList;
import static org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction.evaluateConditions;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -221,229 +211,6 @@ public class TransportRolloverActionTests extends ESTestCase {
results2.forEach((k, v) -> assertFalse(v));
}
public void testRolloverAliasActions() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, false, null);
assertThat(actions, hasSize(2));
boolean foundAdd = false;
boolean foundRemove = false;
for (AliasAction action : actions) {
if (action.getIndex().equals(targetIndex)) {
assertEquals(sourceAlias, ((AliasAction.Add) action).getAlias());
foundAdd = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertEquals(sourceAlias, ((AliasAction.Remove) action).getAlias());
foundRemove = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAdd);
assertTrue(foundRemove);
}
public void testRolloverAliasActionsWithExplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, true, null);
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
for (AliasAction action : actions) {
AliasAction.Add addAction = (AliasAction.Add) action;
if (action.getIndex().equals(targetIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertTrue(addAction.writeIndex());
foundAddWrite = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertFalse(addAction.writeIndex());
foundRemoveWrite = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAddWrite);
assertTrue(foundRemoveWrite);
}
public void testRolloverAliasActionsWithHiddenAliasAndExplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, true, true);
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
for (AliasAction action : actions) {
assertThat(action, instanceOf(AliasAction.Add.class));
AliasAction.Add addAction = (AliasAction.Add) action;
if (action.getIndex().equals(targetIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertTrue(addAction.writeIndex());
assertTrue(addAction.isHidden());
foundAddWrite = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertEquals(sourceAlias, addAction.getAlias());
assertFalse(addAction.writeIndex());
assertTrue(addAction.isHidden());
foundRemoveWrite = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAddWrite);
assertTrue(foundRemoveWrite);
}
public void testRolloverAliasActionsWithHiddenAliasAndImplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, false, true);
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
for (AliasAction action : actions) {
if (action.getIndex().equals(targetIndex)) {
assertThat(action, instanceOf(AliasAction.Add.class));
AliasAction.Add addAction = (AliasAction.Add) action;
assertEquals(sourceAlias, addAction.getAlias());
assertThat(addAction.writeIndex(), nullValue());
assertTrue(addAction.isHidden());
foundAddWrite = true;
} else if (action.getIndex().equals(sourceIndex)) {
assertThat(action, instanceOf(AliasAction.Remove.class));
AliasAction.Remove removeAction = (AliasAction.Remove) action;
assertEquals(sourceAlias, removeAction.getAlias());
foundRemoveWrite = true;
} else {
throw new AssertionError("Unknown index [" + action.getIndex() + "]");
}
}
assertTrue(foundAddWrite);
assertTrue(foundRemoveWrite);
}
public void testValidation() {
String index1 = randomAlphaOfLength(10);
String aliasWithWriteIndex = randomAlphaOfLength(10);
String index2 = randomAlphaOfLength(10);
String aliasWithNoWriteIndex = randomAlphaOfLength(10);
Boolean firstIsWriteIndex = randomFrom(false, null);
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
MetaData.Builder metaDataBuilder = MetaData.builder()
.put(IndexMetaData.builder(index1)
.settings(settings)
.putAlias(AliasMetaData.builder(aliasWithWriteIndex))
.putAlias(AliasMetaData.builder(aliasWithNoWriteIndex).writeIndex(firstIsWriteIndex))
);
IndexMetaData.Builder indexTwoBuilder = IndexMetaData.builder(index2).settings(settings);
if (firstIsWriteIndex == null) {
indexTwoBuilder.putAlias(AliasMetaData.builder(aliasWithNoWriteIndex).writeIndex(randomFrom(false, null)));
}
metaDataBuilder.put(indexTwoBuilder);
MetaData metaData = metaDataBuilder.build();
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () ->
TransportRolloverAction.validate(metaData, new RolloverRequest(aliasWithNoWriteIndex,
randomAlphaOfLength(10))));
assertThat(exception.getMessage(), equalTo("source alias [" + aliasWithNoWriteIndex + "] does not point to a write index"));
exception = expectThrows(IllegalArgumentException.class, () ->
TransportRolloverAction.validate(metaData, new RolloverRequest(randomFrom(index1, index2),
randomAlphaOfLength(10))));
assertThat(exception.getMessage(), equalTo("source alias is a concrete index"));
exception = expectThrows(IllegalArgumentException.class, () ->
TransportRolloverAction.validate(metaData, new RolloverRequest(randomAlphaOfLength(5),
randomAlphaOfLength(10)))
);
assertThat(exception.getMessage(), equalTo("source alias does not exist"));
TransportRolloverAction.validate(metaData, new RolloverRequest(aliasWithWriteIndex, randomAlphaOfLength(10)));
}
public void testGenerateRolloverIndexName() {
String invalidIndexName = randomAlphaOfLength(10) + "A";
IndexNameExpressionResolver indexNameExpressionResolver = new IndexNameExpressionResolver();
expectThrows(IllegalArgumentException.class, () ->
TransportRolloverAction.generateRolloverIndexName(invalidIndexName, indexNameExpressionResolver));
int num = randomIntBetween(0, 100);
final String indexPrefix = randomAlphaOfLength(10);
String indexEndingInNumbers = indexPrefix + "-" + num;
assertThat(TransportRolloverAction.generateRolloverIndexName(indexEndingInNumbers, indexNameExpressionResolver),
equalTo(indexPrefix + "-" + String.format(Locale.ROOT, "%06d", num + 1)));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-name-1", indexNameExpressionResolver),
equalTo("index-name-000002"));
assertThat(TransportRolloverAction.generateRolloverIndexName("index-name-2", indexNameExpressionResolver),
equalTo("index-name-000003"));
assertEquals( "<index-name-{now/d}-000002>", TransportRolloverAction.generateRolloverIndexName("<index-name-{now/d}-1>",
indexNameExpressionResolver));
}
public void testCreateIndexRequest() {
String alias = randomAlphaOfLength(10);
String rolloverIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(alias, randomAlphaOfLength(10));
final ActiveShardCount activeShardCount = randomBoolean() ? ActiveShardCount.ALL : ActiveShardCount.ONE;
rolloverRequest.getCreateIndexRequest().waitForActiveShards(activeShardCount);
final Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build();
rolloverRequest.getCreateIndexRequest().settings(settings);
final CreateIndexClusterStateUpdateRequest createIndexRequest =
TransportRolloverAction.prepareCreateIndexRequest(rolloverIndex, rolloverIndex, rolloverRequest);
assertThat(createIndexRequest.settings(), equalTo(settings));
assertThat(createIndexRequest.index(), equalTo(rolloverIndex));
assertThat(createIndexRequest.cause(), equalTo("rollover_index"));
}
public void testRejectDuplicateAlias() {
final IndexTemplateMetaData template = IndexTemplateMetaData.builder("test-template")
.patterns(Arrays.asList("foo-*", "bar-*"))
.putAlias(AliasMetaData.builder("foo-write")).putAlias(AliasMetaData.builder("bar-write").writeIndex(randomBoolean()))
.build();
final MetaData metaData = MetaData.builder().put(createMetaData(randomAlphaOfLengthBetween(5, 7)), false).put(template).build();
String indexName = randomFrom("foo-123", "bar-xyz");
String aliasName = randomFrom("foo-write", "bar-write");
final IllegalArgumentException ex = expectThrows(IllegalArgumentException.class,
() -> TransportRolloverAction.checkNoDuplicatedAliasInIndexTemplate(metaData, indexName, aliasName, randomBoolean()));
assertThat(ex.getMessage(), containsString("index template [test-template]"));
}
public void testHiddenAffectsResolvedTemplates() {
final IndexTemplateMetaData template = IndexTemplateMetaData.builder("test-template")
.patterns(Collections.singletonList("*"))
.putAlias(AliasMetaData.builder("foo-write")).putAlias(AliasMetaData.builder("bar-write").writeIndex(randomBoolean()))
.build();
final MetaData metaData = MetaData.builder().put(createMetaData(randomAlphaOfLengthBetween(5, 7)), false).put(template).build();
String indexName = randomFrom("foo-123", "bar-xyz");
String aliasName = randomFrom("foo-write", "bar-write");
// hidden shouldn't throw
TransportRolloverAction.checkNoDuplicatedAliasInIndexTemplate(metaData, indexName, aliasName, Boolean.TRUE);
// not hidden will throw
final IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () ->
TransportRolloverAction.checkNoDuplicatedAliasInIndexTemplate(metaData, indexName, aliasName, randomFrom(Boolean.FALSE, null)));
assertThat(ex.getMessage(), containsString("index template [test-template]"));
}
public void testConditionEvaluationWhenAliasToWriteAndReadIndicesConsidersOnlyPrimariesFromWriteIndex() throws Exception {
final TransportService mockTransportService = mock(TransportService.class);
@ -486,9 +253,12 @@ public class TransportRolloverActionTests extends ESTestCase {
final ClusterState stateBefore = ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder().put(indexMetaData).put(indexMetaData2)).build();
when(mockCreateIndexService.applyCreateIndexRequest(any(), any(), anyBoolean())).thenReturn(stateBefore);
when(mdIndexAliasesService.applyAliasActions(any(), any())).thenReturn(stateBefore);
MetaDataRolloverService rolloverService = new MetaDataRolloverService(mockThreadPool, mockCreateIndexService,
mdIndexAliasesService, mockIndexNameExpressionResolver);
final TransportRolloverAction transportRolloverAction = new TransportRolloverAction(mockTransportService, mockClusterService,
mockThreadPool, mockCreateIndexService, mockActionFilters, mockIndexNameExpressionResolver, mdIndexAliasesService,
mockClient);
mockThreadPool, mockActionFilters, mockIndexNameExpressionResolver, rolloverService, mockClient);
// For given alias, verify that condition evaluation fails when the condition doc count is greater than the primaries doc count
// (primaries from only write index is considered)

View File

@ -75,6 +75,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryA
import org.elasticsearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
@ -108,8 +109,6 @@ import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyList;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@ -159,20 +158,20 @@ public class ClusterStateChanges {
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
IndicesService indicesService = mock(IndicesService.class);
// MetaDataCreateIndexService creates indices using its IndicesService instance to check mappings -> fake it here
// MetaDataCreateIndexService uses withTempIndexService to check mappings -> fake it here
try {
@SuppressWarnings("unchecked") final List<IndexEventListener> listeners = anyList();
when(indicesService.createIndex(any(IndexMetaData.class), listeners, anyBoolean()))
when(indicesService.withTempIndexService(any(IndexMetaData.class), any(CheckedFunction.class)))
.then(invocationOnMock -> {
IndexService indexService = mock(IndexService.class);
IndexMetaData indexMetaData = (IndexMetaData)invocationOnMock.getArguments()[0];
IndexMetaData indexMetaData = (IndexMetaData) invocationOnMock.getArguments()[0];
when(indexService.index()).thenReturn(indexMetaData.getIndex());
MapperService mapperService = mock(MapperService.class);
when(indexService.mapperService()).thenReturn(mapperService);
when(mapperService.documentMapper()).thenReturn(null);
when(indexService.getIndexEventListener()).thenReturn(new IndexEventListener() {});
when(indexService.getIndexSortSupplier()).thenReturn(() -> null);
return indexService;
//noinspection unchecked
return ((CheckedFunction) invocationOnMock.getArguments()[1]).apply(indexService);
});
} catch (IOException e) {
throw new IllegalStateException(e);