diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cfda4c8913f..f25a6f0716d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2137,8 +2137,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl StreamSupport .stream(globalCheckpoints.values().spliterator(), false) .anyMatch(v -> v.value < globalCheckpoint); - // only sync if there is a shard lagging the primary - if (syncNeeded) { + // only sync if index is not closed and there is a shard lagging the primary + if (syncNeeded && indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN) { logger.trace("syncing global checkpoint for [{}]", reason); globalCheckpointSyncer.run(); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 70b726ac01a..1bb5c0f8165 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1141,6 +1141,33 @@ public class IndexShardTests extends IndexShardTestCase { closeShards(replicaShard, primaryShard); } + public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception { + ShardId shardId = new ShardId("index", "_na_", 0); + IndexMetaData.Builder indexMetadata = IndexMetaData.builder("index") + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)) + .state(IndexMetaData.State.CLOSE).primaryTerm(0, 1); + ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(8), true, + ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE); + AtomicBoolean synced = new AtomicBoolean(); + IndexShard primaryShard = newShard(shardRouting, indexMetadata.build(), null, new InternalEngineFactory(), + () -> synced.set(true), RetentionLeaseSyncer.EMPTY); + recoverShardFromStore(primaryShard); + IndexShard replicaShard = newShard(shardId, false); + recoverReplica(replicaShard, primaryShard, true); + int numDocs = between(1, 10); + for (int i = 0; i < numDocs; i++) { + indexDoc(primaryShard, "_doc", Integer.toString(i)); + } + assertThat(primaryShard.getLocalCheckpoint(), equalTo(numDocs - 1L)); + primaryShard.updateLocalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), primaryShard.getLocalCheckpoint()); + long globalCheckpointOnReplica = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, primaryShard.getLocalCheckpoint()); + primaryShard.updateGlobalCheckpointForShard(replicaShard.shardRouting.allocationId().getId(), globalCheckpointOnReplica); + primaryShard.maybeSyncGlobalCheckpoint("test"); + assertFalse("closed indices should skip global checkpoint sync", synced.get()); + closeShards(primaryShard, replicaShard); + } + public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException { final IndexShard indexShard = newStartedShard(false); final int operations = 1024 - scaledRandomIntBetween(0, 1024); diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index e3808175f76..b735416cf23 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -36,7 +36,9 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndexClosedException; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; @@ -424,6 +426,31 @@ public class CloseIndexIT extends ESIntegTestCase { } } + public void testResyncPropagatePrimaryTerm() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(3); + final String indexName = "closed_indices_promotion"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + assertAcked(client().admin().indices().prepareClose(indexName)); + assertIndexIsClosed(indexName); + ensureGreen(indexName); + String nodeWithPrimary = clusterService().state().nodes().get(clusterService().state() + .routingTable().index(indexName).shard(0).primaryShard().currentNodeId()).getName(); + internalCluster().restartNode(nodeWithPrimary, new InternalTestCluster.RestartCallback()); + ensureGreen(indexName); + long primaryTerm = clusterService().state().metaData().index(indexName).primaryTerm(0); + for (String nodeName : internalCluster().nodesInclude(indexName)) { + IndexShard shard = internalCluster().getInstance(IndicesService.class, nodeName) + .indexService(resolveIndex(indexName)).getShard(0); + assertThat(shard.routingEntry().toString(), shard.getOperationPrimaryTerm(), equalTo(primaryTerm)); + } + } + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); for (String index : indices) {