Fix testRecoverLocallyUpToGlobalCheckpoint (#55189)

Peer recovery fails if the primary does not see the recovering replica 
in the replication group (when the cluster state update on the primary
is delayed). To verify the local recovery stats, we have to remember 
this value in the first try because the local recovery happens once, and
its stats is reset when the recovery fails.

Closes #54829
This commit is contained in:
Nhat Nguyen 2020-04-15 08:28:04 -04:00
parent 3cc4e0dd09
commit 49f6dbf8e8
1 changed files with 17 additions and 2 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices.recovery; package org.elasticsearch.indices.recovery;
import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
@ -1174,12 +1175,25 @@ public class IndexRecoveryIT extends ESIntegTestCase {
client().admin().indices().prepareRefresh(indexName).get(); // avoid refresh when we are failing a shard client().admin().indices().prepareRefresh(indexName).get(); // avoid refresh when we are failing a shard
String failingNode = randomFrom(nodes); String failingNode = randomFrom(nodes);
PlainActionFuture<StartRecoveryRequest> startRecoveryRequestFuture = new PlainActionFuture<>(); PlainActionFuture<StartRecoveryRequest> startRecoveryRequestFuture = new PlainActionFuture<>();
// Peer recovery fails if the primary does not see the recovering replica in the replication group (when the cluster state
// update on the primary is delayed). To verify the local recovery stats, we have to manually remember this value in the
// first try because the local recovery happens once and its stats is reset when the recovery fails.
SetOnce<Integer> localRecoveredOps = new SetOnce<>();
for (String node : nodes) { for (String node : nodes) {
MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node);
transportService.addSendBehavior((connection, requestId, action, request, options) -> { transportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) { if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) {
assertFalse("recovery request was set already", startRecoveryRequestFuture.isDone()); final RecoveryState recoveryState = internalCluster().getInstance(IndicesService.class, failingNode)
startRecoveryRequestFuture.onResponse((StartRecoveryRequest) request); .getShardOrNull(new ShardId(resolveIndex(indexName), 0)).recoveryState();
assertThat(recoveryState.getTranslog().recoveredOperations(), equalTo(recoveryState.getTranslog().totalLocal()));
if (startRecoveryRequestFuture.isDone()) {
assertThat(recoveryState.getTranslog().totalLocal(), equalTo(0));
recoveryState.getTranslog().totalLocal(localRecoveredOps.get());
recoveryState.getTranslog().incrementRecoveredOperations(localRecoveredOps.get());
} else {
localRecoveredOps.set(recoveryState.getTranslog().totalLocal());
startRecoveryRequestFuture.onResponse((StartRecoveryRequest) request);
}
} }
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RetentionLeases retentionLeases = internalCluster().getInstance(IndicesService.class, node) RetentionLeases retentionLeases = internalCluster().getInstance(IndicesService.class, node)
@ -1210,6 +1224,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
assertThat(commitInfoAfterLocalRecovery.maxSeqNo, equalTo(lastSyncedGlobalCheckpoint)); assertThat(commitInfoAfterLocalRecovery.maxSeqNo, equalTo(lastSyncedGlobalCheckpoint));
assertThat(startRecoveryRequest.startingSeqNo(), equalTo(lastSyncedGlobalCheckpoint + 1)); assertThat(startRecoveryRequest.startingSeqNo(), equalTo(lastSyncedGlobalCheckpoint + 1));
ensureGreen(indexName); ensureGreen(indexName);
assertThat((long) localRecoveredOps.get(), equalTo(lastSyncedGlobalCheckpoint - localCheckpointOfSafeCommit));
for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) { for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) {
if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) { if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) {
assertThat("expect an operation-based recovery", recoveryState.getIndex().fileDetails(), empty()); assertThat("expect an operation-based recovery", recoveryState.getIndex().fileDetails(), empty());