Ignore Lucene index in peer recovery if translog corrupted (#49114)

If the translog on a replica is corrupt, we should not perform an 
operation-based recovery or utilize sync_id as we won't be able to open
an engine in the next step. This change adds an extra validation that
ensures translog is okay when preparing a peer recovery request.
This commit is contained in:
Nhat Nguyen 2019-11-18 11:29:18 -05:00
parent fec22130c2
commit 37a9cd677b
2 changed files with 43 additions and 0 deletions

View File

@ -50,6 +50,8 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -327,6 +329,17 @@ public class PeerRecoveryTargetService implements IndexEventListener {
Store.MetadataSnapshot metadataSnapshot; Store.MetadataSnapshot metadataSnapshot;
try { try {
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
try {
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
} catch (IOException | TranslogCorruptedException e) {
logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +
"resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
startingSeqNo = UNASSIGNED_SEQ_NO;
}
} catch (final org.apache.lucene.index.IndexNotFoundException e) { } catch (final org.apache.lucene.index.IndexNotFoundException e) {
// happens on an empty folder. no need to log // happens on an empty folder. no need to log
assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo; assert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
@ -60,6 +61,7 @@ import java.util.stream.LongStream;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
public class PeerRecoveryTargetServiceTests extends IndexShardTestCase { public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
@ -286,4 +288,32 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
recoveryTarget.decRef(); recoveryTarget.decRef();
closeShards(shard); closeShards(shard);
} }
public void testResetStartRequestIfTranslogIsCorrupted() throws Exception {
DiscoveryNode pNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
DiscoveryNode rNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
Collections.emptyMap(), Collections.emptySet(), Version.CURRENT);
IndexShard shard = newStartedShard(false);
final SeqNoStats seqNoStats = populateRandomData(shard);
shard.close("test", false);
if (randomBoolean()) {
shard.store().associateIndexWithNewTranslog(UUIDs.randomBase64UUID());
} else if (randomBoolean()) {
Translog.createEmptyTranslog(
shard.shardPath().resolveTranslog(), seqNoStats.getGlobalCheckpoint(), shard.shardId(), shard.getOperationPrimaryTerm());
} else {
IOUtils.rm(shard.shardPath().resolveTranslog());
}
shard = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE));
shard.markAsRecovering("peer recovery", new RecoveryState(shard.routingEntry(), pNode, rNode));
shard.prepareForIndexRecovery();
RecoveryTarget recoveryTarget = new RecoveryTarget(shard, null, null);
StartRecoveryRequest request = PeerRecoveryTargetService.getStartRecoveryRequest(
logger, rNode, recoveryTarget, randomNonNegativeLong());
assertThat(request.startingSeqNo(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(request.metadataSnapshot(), sameInstance(Store.MetadataSnapshot.EMPTY));
recoveryTarget.decRef();
closeShards(shard);
}
} }