Fix IndexRecoveryIT transient error test (#57826)
Currently it is possible for a transient network error to disrupt the start recovery request from the remote to source node. This disruption is racy with the recovery occurring on the source node. It is possible for the source node to finish and clear its recovery. When this occurs, the recovery cannot be reestablished and the "no two start" assertion is tripped. This commit fixes this issue by allowing two starts if the finalize request has been received. Fixes #57416.
This commit is contained in:
parent
8119b96517
commit
2630c80b5d
|
@ -792,22 +792,26 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
MockTransportService redTransportService =
|
||||
(MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
|
||||
|
||||
AtomicBoolean recoveryStarted = new AtomicBoolean(false);
|
||||
final AtomicBoolean recoveryStarted = new AtomicBoolean(false);
|
||||
final AtomicBoolean finalizeReceived = new AtomicBoolean(false);
|
||||
|
||||
final SingleStartEnforcer validator = new SingleStartEnforcer(indexName, recoveryStarted);
|
||||
final SingleStartEnforcer validator = new SingleStartEnforcer(indexName, recoveryStarted, finalizeReceived);
|
||||
redTransportService.addSendBehavior(blueTransportService, (connection, requestId, action, request, options) -> {
|
||||
validator.accept(action, request);
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
Runnable connectionBreaker = () -> {
|
||||
// Always break connection from source to remote to ensure that actions are retried
|
||||
logger.info("--> closing connections from source node to target node");
|
||||
blueTransportService.disconnectFromNode(redTransportService.getLocalDiscoNode());
|
||||
if (randomBoolean()) {
|
||||
// Sometimes break connection from remote to source to ensure that recovery is re-established
|
||||
logger.info("--> closing connections from target node to source node");
|
||||
redTransportService.disconnectFromNode(blueTransportService.getLocalDiscoNode());
|
||||
}
|
||||
};
|
||||
TransientReceiveRejected handlingBehavior = new TransientReceiveRejected(recoveryActionToBlock, recoveryStarted, connectionBreaker);
|
||||
TransientReceiveRejected handlingBehavior =
|
||||
new TransientReceiveRejected(recoveryActionToBlock, finalizeReceived, recoveryStarted, connectionBreaker);
|
||||
redTransportService.addRequestHandlingBehavior(recoveryActionToBlock, handlingBehavior);
|
||||
|
||||
try {
|
||||
|
@ -831,12 +835,15 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
|
||||
private final String actionName;
|
||||
private final AtomicBoolean recoveryStarted;
|
||||
private final AtomicBoolean finalizeReceived;
|
||||
private final Runnable connectionBreaker;
|
||||
private final AtomicInteger blocksRemaining;
|
||||
|
||||
private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarted, Runnable connectionBreaker) {
|
||||
private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarted, AtomicBoolean finalizeReceived,
|
||||
Runnable connectionBreaker) {
|
||||
this.actionName = actionName;
|
||||
this.recoveryStarted = recoveryStarted;
|
||||
this.finalizeReceived = finalizeReceived;
|
||||
this.connectionBreaker = connectionBreaker;
|
||||
this.blocksRemaining = new AtomicInteger(randomIntBetween(1, 3));
|
||||
}
|
||||
|
@ -845,6 +852,9 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
public void messageReceived(TransportRequestHandler<TransportRequest> handler, TransportRequest request, TransportChannel channel,
|
||||
Task task) throws Exception {
|
||||
recoveryStarted.set(true);
|
||||
if (actionName.equals(PeerRecoveryTargetService.Actions.FINALIZE)) {
|
||||
finalizeReceived.set(true);
|
||||
}
|
||||
if (blocksRemaining.getAndUpdate(i -> i == 0 ? 0 : i - 1) != 0) {
|
||||
String rejected = "rejected";
|
||||
String circuit = "circuit";
|
||||
|
@ -870,11 +880,13 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
private class SingleStartEnforcer implements BiConsumer<String, TransportRequest> {
|
||||
|
||||
private final AtomicBoolean recoveryStarted;
|
||||
private final AtomicBoolean finalizeReceived;
|
||||
private final String indexName;
|
||||
|
||||
private SingleStartEnforcer(String indexName, AtomicBoolean recoveryStarted) {
|
||||
private SingleStartEnforcer(String indexName, AtomicBoolean recoveryStarted, AtomicBoolean finalizeReceived) {
|
||||
this.indexName = indexName;
|
||||
this.recoveryStarted = recoveryStarted;
|
||||
this.finalizeReceived = finalizeReceived;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -885,7 +897,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
|
|||
StartRecoveryRequest startRecoveryRequest = (StartRecoveryRequest) request;
|
||||
ShardId shardId = startRecoveryRequest.shardId();
|
||||
logger.info("--> attempting to send start_recovery request for shard: " + shardId);
|
||||
if (indexName.equals(shardId.getIndexName()) && recoveryStarted.get()) {
|
||||
if (indexName.equals(shardId.getIndexName()) && recoveryStarted.get() && finalizeReceived.get() == false) {
|
||||
throw new IllegalStateException("Recovery cannot be started twice");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue