Copy retention leases when trim unsafe commits (#37995)

When a primary shard is recovered from its store, we trim the last
commit (when it's unsafe). If that primary crashes before the recovery
completes, we will lose the committed retention leases because they are
baked in the last commit. With this change, we copy the retention leases
from the last commit to the safe commit when trimming unsafe commits.

Relates #37165
This commit is contained in:
Nhat Nguyen 2019-02-11 20:36:27 -05:00
parent ec08581319
commit 5d22e45990
2 changed files with 91 additions and 3 deletions

View File

@ -1521,7 +1521,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
if (existingCommits.isEmpty()) {
throw new IllegalArgumentException("No index found to trim");
}
final String translogUUID = existingCommits.get(existingCommits.size() - 1).getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit lastIndexCommitCommit = existingCommits.get(existingCommits.size() - 1);
final String translogUUID = lastIndexCommitCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY);
final IndexCommit startingIndexCommit;
// We may not have a safe commit if an index was create before v6.2; and if there is a snapshotted commit whose translog
// are not retained but max_seqno is at most the global checkpoint, we may mistakenly select it as a starting commit.
@ -1546,7 +1547,14 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
+ startingIndexCommit.getUserData().get(Translog.TRANSLOG_UUID_KEY) + "] is not equal to last commit's translog uuid ["
+ translogUUID + "]");
}
if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) {
if (startingIndexCommit.equals(lastIndexCommitCommit) == false) {
/*
* Unlike other commit tags, the retention-leases tag is not restored when an engine is
* recovered from translog. We need to manually copy it from the last commit to the safe commit;
* otherwise we might lose the latest committed retention leases when re-opening an engine.
*/
final Map<String, String> userData = new HashMap<>(startingIndexCommit.getUserData());
userData.put(Engine.RETENTION_LEASES, lastIndexCommitCommit.getUserData().getOrDefault(Engine.RETENTION_LEASES, ""));
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
@ -1557,7 +1565,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
// The new commit will use segment files from the starting commit but userData from the last commit by default.
// Thus, we need to manually set the userData from the starting commit to the new commit.
writer.setLiveCommitData(startingIndexCommit.getUserData().entrySet());
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
}

View File

@ -20,32 +20,42 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.index.SegmentInfos;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.RetentionLeaseStats;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
@ -294,6 +304,76 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
}
}
public void testRecoverFromStoreReserveRetentionLeases() throws Exception {
final AtomicBoolean throwDuringRecoverFromTranslog = new AtomicBoolean();
final IndexShard shard = newStartedShard(false, Settings.builder().put("index.soft_deletes.enabled", true).build(),
config -> new InternalEngine(config) {
@Override
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner,
long recoverUpToSeqNo) throws IOException {
if (throwDuringRecoverFromTranslog.get()) {
throw new RuntimeException("crashed before recover from translog is completed");
}
return super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
}
});
final List<RetentionLease> leases = new ArrayList<>();
long version = randomLongBetween(0, 100);
long primaryTerm = randomLongBetween(1, 100);
final int iterations = randomIntBetween(1, 10);
for (int i = 0; i < iterations; i++) {
if (randomBoolean()) {
indexDoc(shard, "_doc", Integer.toString(i));
} else {
leases.add(new RetentionLease(Integer.toString(i), randomNonNegativeLong(),
randomLongBetween(Integer.MAX_VALUE, Long.MAX_VALUE), "test"));
}
if (randomBoolean()) {
if (randomBoolean()) {
version += randomLongBetween(1, 100);
primaryTerm += randomLongBetween(0, 100);
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
}
}
if (randomBoolean()) {
shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()), "test");
flushShard(shard);
}
}
version += randomLongBetween(1, 100);
primaryTerm += randomLongBetween(0, 100);
shard.updateRetentionLeasesOnReplica(new RetentionLeases(primaryTerm, version, leases));
shard.flush(new FlushRequest().force(true).waitIfOngoing(true));
closeShard(shard, false);
final IndexShard failedShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
final DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
failedShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
throwDuringRecoverFromTranslog.set(true);
expectThrows(IndexShardRecoveryException.class, failedShard::recoverFromStore);
closeShards(failedShard);
final IndexShard newShard = reinitShard(shard, newShardRouting(shard.routingEntry().shardId(),
shard.routingEntry().currentNodeId(), true, ShardRoutingState.INITIALIZING,
RecoverySource.ExistingStoreRecoverySource.INSTANCE));
newShard.markAsRecovering("store", new RecoveryState(failedShard.routingEntry(), localNode, null));
throwDuringRecoverFromTranslog.set(false);
assertTrue(newShard.recoverFromStore());
final RetentionLeases retentionLeases = newShard.getRetentionLeases();
assertThat(retentionLeases.version(), equalTo(version));
assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm));
if (leases.isEmpty()) {
assertThat(retentionLeases.leases(), empty());
} else {
assertThat(retentionLeases.leases(), containsInAnyOrder(leases.toArray(new RetentionLease[0])));
}
closeShards(newShard);
}
private void assertRetentionLeases(
final IndexShard indexShard,
final int size,