Fix recovery stage transition with sync_id (#57754)

If the recovery source is on an old node (before 7.2), then the recovery
target won't have the safe commit after phase1 because the recovery
source does not send the global checkpoint in the clean_files step. And
if the recovery fails and retries, then the recovery stage won't
transition properly. If a sync_id is used in peer recovery, then the
clean_files step won't be executed to move the stage to TRANSLOG.

Relates ##7187
Closes #57708
This commit is contained in:
Nhat Nguyen 2020-06-15 13:06:31 -04:00
parent b99b2f1a08
commit de6ac6aea6
7 changed files with 84 additions and 24 deletions

View File

@ -304,6 +304,9 @@ public class RecoveryIT extends AbstractRollingTestCase {
default:
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
if (randomBoolean()) {
syncedFlush(index);
}
}
public void testRecovery() throws Exception {

View File

@ -34,9 +34,11 @@ import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryCleanFilesRequest;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -49,6 +51,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -357,6 +360,57 @@ public class ReplicaShardAllocatorIT extends ESIntegTestCase {
assertNoOpRecoveries(indexName);
}
/**
* If the recovery source is on an old node (before <pre>{@link org.elasticsearch.Version#V_7_2_0}</pre>) then the recovery target
* won't have the safe commit after phase1 because the recovery source does not send the global checkpoint in the clean_files
* step. And if the recovery fails and retries, then the recovery stage might not transition properly. This test simulates
* this behavior by changing the global checkpoint in phase1 to unassigned.
*/
public void testSimulateRecoverySourceOnOldNode() throws Exception {
internalCluster().startMasterOnlyNode();
String source = internalCluster().startDataOnlyNode();
String indexName = "test";
assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)));
ensureGreen(indexName);
if (randomBoolean()) {
indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500))
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList()));
}
if (randomBoolean()) {
client().admin().indices().prepareFlush(indexName).get();
}
if (randomBoolean()) {
assertBusy(() -> {
SyncedFlushResponse syncedFlushResponse = client().admin().indices().prepareSyncedFlush(indexName).get();
assertThat(syncedFlushResponse.successfulShards(), equalTo(1));
});
}
internalCluster().startDataOnlyNode();
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, source);
Semaphore failRecovery = new Semaphore(1);
transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.CLEAN_FILES)) {
RecoveryCleanFilesRequest cleanFilesRequest = (RecoveryCleanFilesRequest) request;
request = new RecoveryCleanFilesRequest(cleanFilesRequest.recoveryId(), cleanFilesRequest.requestSeqNo(),
cleanFilesRequest.shardId(), cleanFilesRequest.sourceMetaSnapshot(),
cleanFilesRequest.totalTranslogOps(), SequenceNumbers.UNASSIGNED_SEQ_NO);
}
if (action.equals(PeerRecoveryTargetService.Actions.FINALIZE)) {
if (failRecovery.tryAcquire()) {
throw new IllegalStateException("simulated");
}
}
connection.sendRequest(requestId, action, request, options);
});
assertAcked(client().admin().indices().prepareUpdateSettings()
.setIndices(indexName).setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()));
ensureGreen(indexName);
transportService.clearAllRules();
}
private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception {
assertBusy(() -> {
Index index = resolveIndex(indexName);

View File

@ -1414,7 +1414,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
@ -1595,7 +1595,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
@ -1616,7 +1616,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
getEngine().skipTranslogRecovery();
@ -1651,7 +1651,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
onSettingsChanged();
assert assertSequenceNumbersInCommit();
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
}
private boolean assertSequenceNumbersInCommit() throws IOException {

View File

@ -36,7 +36,7 @@ public class RecoveryCleanFilesRequest extends RecoveryTransportRequest {
private final int totalTranslogOps;
private final long globalCheckpoint;
RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
public RecoveryCleanFilesRequest(long recoveryId, long requestSeqNo, ShardId shardId, Store.MetadataSnapshot snapshotFiles,
int totalTranslogOps, long globalCheckpoint) {
super(requestSeqNo);
this.recoveryId = recoveryId;

View File

@ -174,12 +174,20 @@ public class RecoveryState implements ToXContentFragment, Writeable {
private void validateAndSetStage(Stage expected, Stage next) {
if (stage != expected) {
assert false : "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])";
throw new IllegalStateException("can't move recovery to stage [" + next + "]. current stage: ["
+ stage + "] (expected [" + expected + "])");
}
stage = next;
}
public synchronized void validateCurrentStage(Stage expected) {
if (stage != expected) {
assert false : "expected stage [" + expected + "]; but current stage is [" + stage + "]";
throw new IllegalStateException("expected stage [" + expected + "] but current stage is [" + stage + "]");
}
}
// synchronized is strictly speaking not needed (this is called by a single thread), but just to be safe
public synchronized RecoveryState setStage(Stage stage) {
switch (stage) {

View File

@ -217,9 +217,9 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
// copy with truncated translog
shard = newStartedShard(false);
SeqNoStats seqNoStats = populateRandomData(shard);
globalCheckpoint = randomFrom(UNASSIGNED_SEQ_NO, seqNoStats.getMaxSeqNo());
replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(),
RecoverySource.PeerRecoverySource.INSTANCE));
globalCheckpoint = randomFrom(UNASSIGNED_SEQ_NO, seqNoStats.getMaxSeqNo());
String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint,
replica.shardId(), replica.getPendingPrimaryTerm());
replica.store().associateIndexWithNewTranslog(translogUUID);
@ -233,6 +233,7 @@ public class PeerRecoveryTargetServiceTests extends IndexShardTestCase {
assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(RecoveryState.Translog.UNKNOWN));
}
assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG));
assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0));
assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(replica.recoveryState().getStage(), equalTo(RecoveryState.Stage.TRANSLOG));

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
@ -55,6 +54,7 @@ import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
public class RecoveryTargetTests extends ESTestCase {
abstract class Streamer<T extends Writeable> extends Thread {
@ -343,16 +343,13 @@ public class RecoveryTargetTests extends ESTestCase {
public void testStageSequenceEnforcement() {
final DiscoveryNode discoveryNode = new DiscoveryNode("1", buildNewFakeTransportAddress(), emptyMap(), emptySet(),
Version.CURRENT);
Stage[] stages = Stage.values();
int i = randomIntBetween(0, stages.length - 1);
int j;
do {
j = randomIntBetween(0, stages.length - 1);
} while (j == i);
Stage t = stages[i];
stages[i] = stages[j];
stages[j] = t;
try {
final AssertionError error = expectThrows(AssertionError.class, () -> {
Stage[] stages = Stage.values();
int i = randomIntBetween(0, stages.length - 1);
int j = randomValueOtherThan(i, () -> randomIntBetween(0, stages.length - 1));
Stage t = stages[i];
stages[i] = stages[j];
stages[j] = t;
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),
randomBoolean(), ShardRoutingState.INITIALIZING);
RecoveryState state = new RecoveryState(shardRouting, discoveryNode,
@ -363,14 +360,11 @@ public class RecoveryTargetTests extends ESTestCase {
}
state.setStage(stage);
}
fail("succeeded in performing the illegal sequence [" + Strings.arrayToCommaDelimitedString(stages) + "]");
} catch (IllegalStateException e) {
// cool
}
});
assertThat(error.getMessage(), startsWith("can't move recovery to stage"));
// but reset should be always possible.
stages = Stage.values();
i = randomIntBetween(1, stages.length - 1);
Stage[] stages = Stage.values();
int i = randomIntBetween(1, stages.length - 1);
ArrayList<Stage> list = new ArrayList<>(Arrays.asList(Arrays.copyOfRange(stages, 0, i)));
list.addAll(Arrays.asList(stages));
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("bla", "_na_", 0), discoveryNode.getId(),