Improve start_recovery check in IndexRecoveryIT (#55867)

Currently the testTransientErrorsDuringRecoveryAreRetried validates that
the expected peer recovery starts only once. This check is coarse and is
executed on all nodes and indexes. This commit modifies this check to
only be performed on the expected index. Additionally this commit
removes the disruption behavior from the "blue" node where it is not
relevant. Finally, this commit improves the logging for this test.
This commit is contained in:
Tim Brooks 2020-04-28 12:06:15 -06:00
parent 4315a55a1c
commit 8d1595698b
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77

View File

@ -130,6 +130,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -783,24 +784,22 @@ public class IndexRecoveryIT extends ESIntegTestCase {
PeerRecoveryTargetService.Actions.FINALIZE
};
final String recoveryActionToBlock = randomFrom(recoveryActions);
logger.info("--> will break connection between blue & red on [{}]", recoveryActionToBlock);
logger.info("--> will temporarily interrupt recovery action between blue & red on [{}]", recoveryActionToBlock);
MockTransportService blueTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName);
MockTransportService redTransportService =
(MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName);
final SingleStartEnforcer validator = new SingleStartEnforcer();
blueTransportService.addSendBehavior(redTransportService, (connection, requestId, action, request, options) -> {
validator.accept(action);
connection.sendRequest(requestId, action, request, options);
});
AtomicBoolean recoveryStarted = new AtomicBoolean(false);
final SingleStartEnforcer validator = new SingleStartEnforcer(indexName, recoveryStarted);
redTransportService.addSendBehavior(blueTransportService, (connection, requestId, action, request, options) -> {
validator.accept(action);
validator.accept(action, request);
connection.sendRequest(requestId, action, request, options);
});
blueTransportService.addRequestHandlingBehavior(recoveryActionToBlock, new TransientReceiveRejected(recoveryActionToBlock));
redTransportService.addRequestHandlingBehavior(recoveryActionToBlock, new TransientReceiveRejected(recoveryActionToBlock));
TransientReceiveRejected handlingBehavior = new TransientReceiveRejected(recoveryActionToBlock, recoveryStarted);
redTransportService.addRequestHandlingBehavior(recoveryActionToBlock, handlingBehavior);
try {
logger.info("--> starting recovery from blue to red");
@ -822,17 +821,20 @@ public class IndexRecoveryIT extends ESIntegTestCase {
private class TransientReceiveRejected implements StubbableTransport.RequestHandlingBehavior<TransportRequest> {
private final String actionName;
private final AtomicBoolean recoveryStarted;
private final AtomicInteger blocksRemaining;
private TransientReceiveRejected(String actionName) {
private TransientReceiveRejected(String actionName, AtomicBoolean recoveryStarted) {
this.actionName = actionName;
this.recoveryStarted = recoveryStarted;
this.blocksRemaining = new AtomicInteger(randomIntBetween(1, 3));
}
@Override
public void messageReceived(TransportRequestHandler<TransportRequest> handler, TransportRequest request, TransportChannel channel,
Task task) throws Exception {
if (blocksRemaining.updateAndGet(i -> i == 0 ? 0 : i - 1) != 0) {
recoveryStarted.set(true);
if (blocksRemaining.getAndUpdate(i -> i == 0 ? 0 : i - 1) != 0) {
logger.info("--> preventing {} response by throwing exception", actionName);
if (randomBoolean()) {
throw new EsRejectedExecutionException();
@ -844,16 +846,25 @@ public class IndexRecoveryIT extends ESIntegTestCase {
}
}
private static class SingleStartEnforcer implements Consumer<String> {
private class SingleStartEnforcer implements BiConsumer<String, TransportRequest> {
private final AtomicBoolean recoveryStarted = new AtomicBoolean(false);
private final AtomicBoolean recoveryStarted;
private final String indexName;
private SingleStartEnforcer(String indexName, AtomicBoolean recoveryStarted) {
this.indexName = indexName;
this.recoveryStarted = recoveryStarted;
}
@Override
public void accept(String action) {
public void accept(String action, TransportRequest request) {
// The cluster state applier will immediately attempt to retry the recovery on a cluster state
// update. We want to assert that the first and only recovery attempt succeeds
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
if (recoveryStarted.compareAndSet(false, true) == false) {
StartRecoveryRequest startRecoveryRequest = (StartRecoveryRequest) request;
ShardId shardId = startRecoveryRequest.shardId();
logger.info("--> attempting to send start_recovery request for shard: " + shardId);
if (indexName.equals(shardId.getIndexName()) && recoveryStarted.get()) {
throw new IllegalStateException("Recovery cannot be started twice");
}
}