Skip global checkpoint sync for closed indices (#41874)
The verifying-before-close step ensures the global checkpoints on all shard copies are in sync; thus, we don' t need to sync global checkpoints for closed indices. Relate #33888
This commit is contained in:
parent
4d55e9e070
commit
3573b1d0ce
|
@ -2137,8 +2137,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
StreamSupport
|
||||
.stream(globalCheckpoints.values().spliterator(), false)
|
||||
.anyMatch(v -> v.value < globalCheckpoint);
|
||||
// only sync if there is a shard lagging the primary
|
||||
if (syncNeeded) {
|
||||
// only sync if index is not closed and there is a shard lagging the primary
|
||||
if (syncNeeded && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
|
||||
logger.trace("syncing global checkpoint for [{}]", reason);
|
||||
globalCheckpointSyncer.run();
|
||||
}
|
||||
|
|
|
@ -1141,6 +1141,33 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(replicaShard, primaryShard);
|
||||
}
|
||||
|
||||
public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception {
|
||||
ShardId shardId = new ShardId("index", "_na_", 0);
|
||||
IndexMetaData.Builder indexMetadata = IndexMetaData.builder("index")
|
||||
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2))
|
||||
.state(IndexMetaData.State.CLOSE).primaryTerm(0, 1);
|
||||
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(8), true,
|
||||
ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
|
||||
AtomicBoolean synced = new AtomicBoolean();
|
||||
IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(),
|
||||
() -> synced.set(true), RetentionLeaseSyncer.EMPTY);
|
||||
recoverShardFromStore(primaryShard);
|
||||
IndexShard replicaShard = newShard(shardId, false);
|
||||
recoverReplica(replicaShard, primaryShard, true);
|
||||
int numDocs = between(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
indexDoc(primaryShard, "_doc", Integer.toString(i));
|
||||
}
|
||||
assertThat(primaryShard.getLocalCheckpoint(), equalTo(numDocs - 1L));
|
||||
primaryShard.updateLocalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), primaryShard.getLocalCheckpoint());
|
||||
long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, primaryShard.getLocalCheckpoint());
|
||||
primaryShard.updateGlobalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), globalCheckpointOnReplica);
|
||||
primaryShard.maybeSyncGlobalCheckpoint("test");
|
||||
assertFalse("closed indices should skip global checkpoint sync", synced.get());
|
||||
closeShards(primaryShard, replicaShard);
|
||||
}
|
||||
|
||||
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
|
||||
final IndexShard indexShard = newStartedShard(false);
|
||||
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
|
||||
|
|
|
@ -36,7 +36,9 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndexClosedException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.test.BackgroundIndexer;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
@ -424,6 +426,31 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testResyncPropagatePrimaryTerm() throws Exception {
|
||||
internalCluster().ensureAtLeastNumDataNodes(3);
|
||||
final String indexName = "closed_indices_promotion";
|
||||
createIndex(indexName, Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
|
||||
.build());
|
||||
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50))
|
||||
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
|
||||
ensureGreen(indexName);
|
||||
assertAcked(client().admin().indices().prepareClose(indexName));
|
||||
assertIndexIsClosed(indexName);
|
||||
ensureGreen(indexName);
|
||||
String nodeWithPrimary = clusterService().state().nodes().get(clusterService().state()
|
||||
.routingTable().index(indexName).shard(0).primaryShard().currentNodeId()).getName();
|
||||
internalCluster().restartNode(nodeWithPrimary, new InternalTestCluster.RestartCallback());
|
||||
ensureGreen(indexName);
|
||||
long primaryTerm = clusterService().state().metaData().index(indexName).primaryTerm(0);
|
||||
for (String nodeName : internalCluster().nodesInclude(indexName)) {
|
||||
IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeName)
|
||||
.indexService(resolveIndex(indexName)).getShard(0);
|
||||
assertThat(shard.routingEntry().toString(), shard.getOperationPrimaryTerm(), equalTo(primaryTerm));
|
||||
}
|
||||
}
|
||||
|
||||
static void assertIndexIsClosed(final String... indices) {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
for (String index : indices) {
|
||||
|
|
Loading…
Reference in New Issue