Rollover avoid heavy lifting in dry-run/validation (#57894)

Fixed two newly introduced issues with rollover:
1. Using auto-expand replicas, rollover could result in unexpected log
messages on future indexes.
2. It did a reroute and other heavy work on the network thread.

Closes #57706
Supersedes #57865
Relates #53965
This commit is contained in:
Henning Andersen 2020-06-09 21:53:15 +02:00 committed by Henning Andersen
parent 439205d1ea
commit 1e8e115ae1
4 changed files with 119 additions and 10 deletions

View File

@ -19,11 +19,18 @@
package org.elasticsearch.action.admin.indices.rollover;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -32,6 +39,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.MockLogAppender;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
@ -181,10 +189,34 @@ public class RolloverIT extends ESIntegTestCase {
}
public void testRolloverDryRun() throws Exception {
if (randomBoolean()) {
PutIndexTemplateRequestBuilder putTemplate = client().admin().indices()
.preparePutTemplate("test_index")
.setPatterns(Collections.singletonList("test_index-*"))
.setOrder(-1)
.setSettings(Settings.builder().put(AutoExpandReplicas.SETTING.getKey(), "0-all"));
assertAcked(putTemplate.get());
}
assertAcked(prepareCreate("test_index-1").addAlias(new Alias("test_alias")).get());
index("test_index-1", "type1", "1", "field", "value");
flush("test_index-1");
ensureGreen();
Logger allocationServiceLogger = LogManager.getLogger(AllocationService.class);
MockLogAppender appender = new MockLogAppender();
appender.start();
appender.addExpectation(
new MockLogAppender.UnseenEventExpectation("no related message logged on dry run",
AllocationService.class.getName(), Level.INFO, "*test_index*")
);
Loggers.addAppender(allocationServiceLogger, appender);
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias").dryRun(true).get();
appender.assertAllExpectationsMatched();
appender.stop();
Loggers.removeAppender(allocationServiceLogger, appender);
assertThat(response.getOldIndex(), equalTo("test_index-1"));
assertThat(response.getNewIndex(), equalTo("test_index-000002"));
assertThat(response.isDryRun(), equalTo(true));

View File

@ -90,16 +90,16 @@ public class MetadataRolloverService {
public RolloverResult rolloverClusterState(ClusterState currentState, String rolloverTarget, String newIndexName,
CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
boolean silent, boolean onlyValidate) throws Exception {
validate(currentState.metadata(), rolloverTarget, newIndexName, createIndexRequest);
final IndexAbstraction indexAbstraction = currentState.metadata().getIndicesLookup().get(rolloverTarget);
switch (indexAbstraction.getType()) {
case ALIAS:
return rolloverAlias(currentState, (IndexAbstraction.Alias) indexAbstraction, rolloverTarget, newIndexName,
createIndexRequest, metConditions, silent);
createIndexRequest, metConditions, silent, onlyValidate);
case DATA_STREAM:
return rolloverDataStream(currentState, (IndexAbstraction.DataStream) indexAbstraction, rolloverTarget,
createIndexRequest, metConditions, silent);
createIndexRequest, metConditions, silent, onlyValidate);
default:
// the validate method above prevents this case
throw new IllegalStateException("unable to roll over type [" + indexAbstraction.getType().getDisplayName() + "]");
@ -108,7 +108,7 @@ public class MetadataRolloverService {
private RolloverResult rolloverAlias(ClusterState currentState, IndexAbstraction.Alias alias, String aliasName,
String newIndexName, CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
boolean silent, boolean onlyValidate) throws Exception {
final Metadata metadata = currentState.metadata();
final IndexMetadata writeIndex = alias.getWriteIndex();
final AliasMetadata aliasMetadata = writeIndex.getAliases().get(alias.getName());
@ -124,6 +124,9 @@ public class MetadataRolloverService {
IndexMetadata.INDEX_HIDDEN_SETTING.get(createIndexRequest.settings()) : null;
createIndexService.validateIndexName(rolloverIndexName, currentState); // fails if the index already exists
checkNoDuplicatedAliasInIndexTemplate(metadata, rolloverIndexName, aliasName, isHidden);
if (onlyValidate) {
return new RolloverResult(rolloverIndexName, sourceIndexName, currentState);
}
CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest =
prepareCreateIndexRequest(unresolvedName, rolloverIndexName, createIndexRequest);
@ -142,12 +145,16 @@ public class MetadataRolloverService {
private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstraction.DataStream dataStream, String dataStreamName,
CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
boolean silent, boolean onlyValidate) throws Exception {
lookupTemplateForDataStream(dataStreamName, currentState.metadata());
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
final String newWriteIndexName = DataStream.getDefaultBackingIndexName(ds.getName(), ds.getGeneration() + 1);
createIndexService.validateIndexName(newWriteIndexName, currentState); // fails if the index already exists
if (onlyValidate) {
return new RolloverResult(newWriteIndexName, originalWriteIndex.getIndex().getName(), currentState);
}
CreateIndexClusterStateUpdateRequest createIndexClusterStateRequest =
prepareDataStreamCreateIndexRequest(dataStreamName, newWriteIndexName, createIndexRequest);

View File

@ -103,10 +103,11 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
@Override
protected void masterOperation(Task task, final RolloverRequest rolloverRequest, final ClusterState state,
final ActionListener<RolloverResponse> listener) throws Exception {
MetadataRolloverService.RolloverResult preResult =
rolloverService.rolloverClusterState(state,
rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(), rolloverRequest.getCreateIndexRequest(),
Collections.emptyList(), true);
Collections.emptyList(), true, true);
Metadata metadata = state.metadata();
String sourceIndexName = preResult.sourceIndexName;
String rolloverIndexName = preResult.rolloverIndexName;
@ -136,7 +137,7 @@ public class TransportRolloverAction extends TransportMasterNodeAction<RolloverR
public ClusterState execute(ClusterState currentState) throws Exception {
MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(currentState,
rolloverRequest.getRolloverTarget(), rolloverRequest.getNewIndexName(),
rolloverRequest.getCreateIndexRequest(), metConditions, false);
rolloverRequest.getCreateIndexRequest(), metConditions, false, false);
if (rolloverResult.sourceIndexName.equals(sourceIndexName) == false) {
throw new ElasticsearchException("Concurrent modification of alias [{}] during rollover",
rolloverRequest.getRolloverTarget());

View File

@ -56,6 +56,7 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.indices.ShardLimitValidator;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
@ -80,7 +81,12 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
public class MetadataRolloverServiceTests extends ESTestCase {
@ -490,7 +496,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
long before = testThreadPool.absoluteTimeInMillis();
MetadataRolloverService.RolloverResult rolloverResult =
rolloverService.rolloverClusterState(clusterState, aliasName, newIndexName, createIndexRequest, metConditions,
randomBoolean());
randomBoolean(), false);
long after = testThreadPool.absoluteTimeInMillis();
newIndexName = newIndexName == null ? indexPrefix + "2" : newIndexName;
@ -557,7 +563,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
long before = testThreadPool.absoluteTimeInMillis();
MetadataRolloverService.RolloverResult rolloverResult =
rolloverService.rolloverClusterState(clusterState, dataStream.getName(), null, createIndexRequest, metConditions,
randomBoolean());
randomBoolean(), false);
long after = testThreadPool.absoluteTimeInMillis();
String sourceIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration());
@ -585,6 +591,69 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
}
public void testValidation() throws Exception {
final String rolloverTarget;
final String sourceIndexName;
final String defaultRolloverIndexName;
final boolean useDataStream = randomBoolean();
final Metadata.Builder builder = Metadata.builder();
if (useDataStream) {
DataStream dataStream = DataStreamTests.randomInstance();
rolloverTarget = dataStream.getName();
sourceIndexName = dataStream.getIndices().get(dataStream.getIndices().size() - 1).getName();
defaultRolloverIndexName = DataStream.getDefaultBackingIndexName(dataStream.getName(), dataStream.getGeneration() + 1);
ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStream.getName() + "*"),
null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
builder.put("template", template);
for (Index index : dataStream.getIndices()) {
builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
builder.put(dataStream);
} else {
String indexPrefix = "logs-index-00000";
rolloverTarget = "logs-alias";
sourceIndexName = indexPrefix + "1";
defaultRolloverIndexName = indexPrefix + "2";
final IndexMetadata.Builder indexMetadata = IndexMetadata.builder(sourceIndexName)
.putAlias(AliasMetadata.builder(rolloverTarget).writeIndex(true).build()).settings(settings(Version.CURRENT))
.numberOfShards(1).numberOfReplicas(1);
builder.put(indexMetadata);
}
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
MetadataCreateIndexService createIndexService = mock(MetadataCreateIndexService.class);
MetadataIndexAliasesService metadataIndexAliasesService = mock(MetadataIndexAliasesService.class);
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
when(mockIndexNameExpressionResolver.resolveDateMathExpression(any())).then(returnsFirstArg());
MetadataRolloverService rolloverService = new MetadataRolloverService(null, createIndexService, metadataIndexAliasesService,
mockIndexNameExpressionResolver);
String newIndexName = useDataStream == false && randomBoolean() ? "logs-index-9" : null;
MetadataRolloverService.RolloverResult rolloverResult = rolloverService.rolloverClusterState(clusterState, rolloverTarget,
newIndexName, new CreateIndexRequest("_na_"), null, randomBoolean(), true);
newIndexName = newIndexName == null ? defaultRolloverIndexName : newIndexName;
assertEquals(sourceIndexName, rolloverResult.sourceIndexName);
assertEquals(newIndexName, rolloverResult.rolloverIndexName);
assertSame(rolloverResult.clusterState, clusterState);
verify(createIndexService).validateIndexName(any(), same(clusterState));
verifyZeroInteractions(createIndexService);
verifyZeroInteractions(metadataIndexAliasesService);
reset(createIndexService);
doThrow(new InvalidIndexNameException("test", "invalid")).when(createIndexService).validateIndexName(any(), any());
expectThrows(InvalidIndexNameException.class,
() -> rolloverService.rolloverClusterState(clusterState, rolloverTarget, null, new CreateIndexRequest("_na_"), null,
randomBoolean(), randomBoolean()));
verify(createIndexService).validateIndexName(any(), same(clusterState));
verifyZeroInteractions(createIndexService);
verifyZeroInteractions(metadataIndexAliasesService);
}
public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception {
final DataStream dataStream = DataStreamTests.randomInstance();
Metadata.Builder builder = Metadata.builder();
@ -614,7 +683,7 @@ public class MetadataRolloverServiceTests extends ESTestCase {
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
Exception e = expectThrows(IllegalArgumentException.class, () -> rolloverService.rolloverClusterState(clusterState,
dataStream.getName(), null, createIndexRequest, metConditions, false));
dataStream.getName(), null, createIndexRequest, metConditions, false, randomBoolean()));
assertThat(e.getMessage(), equalTo("no matching index template found for data stream [" + dataStream.getName() + "]"));
}