From 49f6dbf8e8139d06d288b3d5b0069cf1048ad908 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 15 Apr 2020 08:28:04 -0400 Subject: [PATCH] Fix testRecoverLocallyUpToGlobalCheckpoint (#55189) Peer recovery fails if the primary does not see the recovering replica in the replication group (when the cluster state update on the primary is delayed). To verify the local recovery stats, we have to remember this value in the first try because the local recovery happens once, and its stats is reset when the recovery fails. Closes #54829 --- .../indices/recovery/IndexRecoveryIT.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index df6dbce5852..049e5c6fd03 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.recovery; import org.apache.lucene.analysis.TokenStream; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionRunnable; @@ -1174,12 +1175,25 @@ public class IndexRecoveryIT extends ESIntegTestCase { client().admin().indices().prepareRefresh(indexName).get(); // avoid refresh when we are failing a shard String failingNode = randomFrom(nodes); PlainActionFuture startRecoveryRequestFuture = new PlainActionFuture<>(); + // Peer recovery fails if the primary does not see the recovering replica in the replication group (when the cluster state + // update on the primary is delayed). To verify the local recovery stats, we have to manually remember this value in the + // first try because the local recovery happens once and its stats is reset when the recovery fails. + SetOnce localRecoveredOps = new SetOnce<>(); for (String node : nodes) { MockTransportService transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node); transportService.addSendBehavior((connection, requestId, action, request, options) -> { if (action.equals(PeerRecoverySourceService.Actions.START_RECOVERY)) { - assertFalse("recovery request was set already", startRecoveryRequestFuture.isDone()); - startRecoveryRequestFuture.onResponse((StartRecoveryRequest) request); + final RecoveryState recoveryState = internalCluster().getInstance(IndicesService.class, failingNode) + .getShardOrNull(new ShardId(resolveIndex(indexName), 0)).recoveryState(); + assertThat(recoveryState.getTranslog().recoveredOperations(), equalTo(recoveryState.getTranslog().totalLocal())); + if (startRecoveryRequestFuture.isDone()) { + assertThat(recoveryState.getTranslog().totalLocal(), equalTo(0)); + recoveryState.getTranslog().totalLocal(localRecoveredOps.get()); + recoveryState.getTranslog().incrementRecoveredOperations(localRecoveredOps.get()); + } else { + localRecoveredOps.set(recoveryState.getTranslog().totalLocal()); + startRecoveryRequestFuture.onResponse((StartRecoveryRequest) request); + } } if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { RetentionLeases retentionLeases = internalCluster().getInstance(IndicesService.class, node) @@ -1210,6 +1224,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertThat(commitInfoAfterLocalRecovery.maxSeqNo, equalTo(lastSyncedGlobalCheckpoint)); assertThat(startRecoveryRequest.startingSeqNo(), equalTo(lastSyncedGlobalCheckpoint + 1)); ensureGreen(indexName); + assertThat((long) localRecoveredOps.get(), equalTo(lastSyncedGlobalCheckpoint - localCheckpointOfSafeCommit)); for (RecoveryState recoveryState : client().admin().indices().prepareRecoveries().get().shardRecoveryStates().get(indexName)) { if (startRecoveryRequest.targetNode().equals(recoveryState.getTargetNode())) { assertThat("expect an operation-based recovery", recoveryState.getIndex().fileDetails(), empty());