From 2630c80b5d092292a844c56b7e3ed218ae3b06b7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 9 Jun 2020 10:49:15 -0600 Subject: [PATCH] Fix IndexRecoveryIT transient error test (#57826) Currently it is possible for a transient network error to disrupt the start recovery request from the remote to source node. This disruption is racy with the recovery occurring on the source node. It is possible for the source node to finish and clear its recovery. When this occurs, the recovery cannot be reestablished and the "no two start" assertion is tripped. This commit fixes this issue by allowing two starts if the finalize request has been received. Fixes #57416. --- .../indices/recovery/IndexRecoveryIT.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 3cd975c0598..34551bd8955 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -792,22 +792,26 @@ public class IndexRecoveryIT extends ESIntegTestCase { MockTransportService redTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); - AtomicBoolean recoveryStarted = new AtomicBoolean(false); + final AtomicBoolean recoveryStarted = new AtomicBoolean(false); + final AtomicBoolean finalizeReceived = new AtomicBoolean(false); - final SingleStartEnforcer validator = new SingleStartEnforcer(indexName, recoveryStarted); + final SingleStartEnforcer validator = new SingleStartEnforcer(indexName, recoveryStarted, finalizeReceived); redTransportService.addSendBehavior(blueTransportService, (connection, requestId, action, request, options) -> { validator.accept(action, request); connection.sendRequest(requestId, action, request, options); }); Runnable connectionBreaker = () -> { // Always break connection from source to remote to ensure that actions are retried + logger.info("--> closing connections from source node to target node"); blueTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode()); if (randomBoolean()) { // Sometimes break connection from remote to source to ensure that recovery is re-established + logger.info("--> closing connections from target node to source node"); redTransportService.disconnectFromNode(blueTransportService.getLocalDiscoNode()); } }; - TransientReceiveRejected handlingBehavior = new TransientReceiveRejected(recoveryActionToBlock, recoveryStarted, connectionBreaker); + TransientReceiveRejected handlingBehavior = + new TransientReceiveRejected(recoveryActionToBlock, finalizeReceived, recoveryStarted, connectionBreaker); redTransportService.addRequestHandlingBehavior(recoveryActionToBlock, handlingBehavior); try { @@ -831,12 +835,15 @@ public class IndexRecoveryIT extends ESIntegTestCase { private final String actionName; private final AtomicBoolean recoveryStarted; + private final AtomicBoolean finalizeReceived; private final Runnable connectionBreaker; private final AtomicInteger blocksRemaining; - private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarted, Runnable connectionBreaker) { + private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarted, AtomicBoolean finalizeReceived, + Runnable connectionBreaker) { this.actionName = actionName; this.recoveryStarted = recoveryStarted; + this.finalizeReceived = finalizeReceived; this.connectionBreaker = connectionBreaker; this.blocksRemaining = new AtomicInteger(randomIntBetween(1, 3)); } @@ -845,6 +852,9 @@ public class IndexRecoveryIT extends ESIntegTestCase { public void messageReceived(TransportRequestHandler handler, TransportRequest request, TransportChannel channel, Task task) throws Exception { recoveryStarted.set(true); + if (actionName.equals(PeerRecoveryTargetService.Actions.FINALIZE)) { + finalizeReceived.set(true); + } if (blocksRemaining.getAndUpdate(i -> i == 0 ? 0 : i - 1) != 0) { String rejected = "rejected"; String circuit = "circuit"; @@ -870,11 +880,13 @@ public class IndexRecoveryIT extends ESIntegTestCase { private class SingleStartEnforcer implements BiConsumer { private final AtomicBoolean recoveryStarted; + private final AtomicBoolean finalizeReceived; private final String indexName; - private SingleStartEnforcer(String indexName, AtomicBoolean recoveryStarted) { + private SingleStartEnforcer(String indexName, AtomicBoolean recoveryStarted, AtomicBoolean finalizeReceived) { this.indexName = indexName; this.recoveryStarted = recoveryStarted; + this.finalizeReceived = finalizeReceived; } @Override @@ -885,7 +897,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { StartRecoveryRequest startRecoveryRequest = (StartRecoveryRequest) request; ShardId shardId = startRecoveryRequest.shardId(); logger.info("--> attempting to send start_recovery request for shard: " + shardId); - if (indexName.equals(shardId.getIndexName()) && recoveryStarted.get()) { + if (indexName.equals(shardId.getIndexName()) && recoveryStarted.get() && finalizeReceived.get() == false) { throw new IllegalStateException("Recovery cannot be started twice"); } }