diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 73ac8a65d30..c693d6b9d80 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -1521,7 +1521,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref if (existingCommits.isEmpty()) { throw new IllegalArgumentException("No index found to trim"); } - final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY); + final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1); + final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY); final IndexCommit startingIndexCommit; // We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog // are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit. @@ -1546,7 +1547,14 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref + startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid [" + translogUUID + "]"); } - if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) { + if (startingIndexCommit.equals(lastIndexCommitCommit) == false) { + /* + * Unlike other commit tags, the retention-leases tag is not restored when an engine is + * recovered from translog. We need to manually copy it from the last commit to the safe commit; + * otherwise we might lose the latest committed retention leases when re-opening an engine. + */ + final Map userData = new HashMap<>(startingIndexCommit.getUserData()); + userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, "")); try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) { // this achieves two things: // - by committing a new commit based on the starting commit, it make sure the starting commit will be opened @@ -1557,7 +1565,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref // The new commit will use segment files from the starting commit but userData from the last commit by default. // Thus, we need to manually set the userData from the starting commit to the new commit. - writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet()); + writer.setLiveCommitData(userData.entrySet()); writer.commit(); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index a353845fe48..5f103d484f8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -20,32 +20,42 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.SegmentInfos; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.RetentionLeaseStats; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; @@ -294,6 +304,76 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { } } + public void testRecoverFromStoreReserveRetentionLeases() throws Exception { + final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean(); + final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(), + config -> new InternalEngine(config) { + @Override + public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, + long recoverUpToSeqNo) throws IOException { + if (throwDuringRecoverFromTranslog.get()) { + throw new RuntimeException("crashed before recover from translog is completed"); + } + return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo); + } + }); + final List leases = new ArrayList<>(); + long version = randomLongBetween(0, 100); + long primaryTerm = randomLongBetween(1, 100); + final int iterations = randomIntBetween(1, 10); + for (int i = 0; i < iterations; i++) { + if (randomBoolean()) { + indexDoc(shard, "_doc", Integer.toString(i)); + } else { + leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(), + randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test")); + } + if (randomBoolean()) { + if (randomBoolean()) { + version += randomLongBetween(1, 100); + primaryTerm += randomLongBetween(0, 100); + shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases)); + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + } + } + if (randomBoolean()) { + shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test"); + flushShard(shard); + } + } + version += randomLongBetween(1, 100); + primaryTerm += randomLongBetween(0, 100); + shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases)); + shard.flush(new FlushRequest().force(true).waitIfOngoing(true)); + closeShard(shard, false); + + final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(), + shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.ExistingStoreRecoverySource.INSTANCE)); + final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null)); + throwDuringRecoverFromTranslog.set(true); + expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore); + closeShards(failedShard); + + final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(), + shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING, + RecoverySource.ExistingStoreRecoverySource.INSTANCE)); + newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null)); + throwDuringRecoverFromTranslog.set(false); + assertTrue(newShard.recoverFromStore()); + final RetentionLeases retentionLeases = newShard.getRetentionLeases(); + assertThat(retentionLeases.version(), equalTo(version)); + assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); + if (leases.isEmpty()) { + assertThat(retentionLeases.leases(), empty()); + } else { + assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0]))); + } + closeShards(newShard); + } + private void assertRetentionLeases( final IndexShard indexShard, final int size,