Add retry loop in shard state action tests

This commit enhances the master channel exception test in
o.e.c.a.s.ShardStateActionTests to test that a retries loop as expected
when requests to the master repeatedly fail.
This commit is contained in:
Jason Tedor 2016-01-15 11:26:32 -05:00
parent fe39d11c55
commit 7eefcbbeed
1 changed files with 30 additions and 18 deletions

View File

@ -50,7 +50,9 @@ import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
@ -168,10 +170,10 @@ public class ShardStateActionTests extends ESTestCase {
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean retried = new AtomicBoolean(); AtomicInteger retries = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean(); AtomicBoolean success = new AtomicBoolean();
setUpMasterRetryVerification(retried, latch); setUpMasterRetryVerification(1, retries, latch, requestId -> {});
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override @Override
@ -183,7 +185,7 @@ public class ShardStateActionTests extends ESTestCase {
latch.await(); latch.await();
assertTrue(retried.get()); assertThat(retries.get(), equalTo(1));
assertTrue(success.get()); assertTrue(success.get());
} }
@ -195,11 +197,20 @@ public class ShardStateActionTests extends ESTestCase {
String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID();
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean retried = new AtomicBoolean(); AtomicInteger retries = new AtomicInteger();
AtomicBoolean success = new AtomicBoolean(); AtomicBoolean success = new AtomicBoolean();
AtomicReference<Exception> exception = new AtomicReference<>(); AtomicReference<Exception> exception = new AtomicReference<>();
setUpMasterRetryVerification(retried, latch); LongConsumer retryLoop = requestId -> {
List<Exception> possibleExceptions = new ArrayList<>();
possibleExceptions.add(new NotMasterException("simulated"));
possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME));
possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated"));
transport.handleResponse(requestId, randomFrom(possibleExceptions));
};
final int numberOfRetries = randomIntBetween(1, 256);
setUpMasterRetryVerification(numberOfRetries, retries, latch, retryLoop);
shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() {
@Override @Override
@ -219,15 +230,12 @@ public class ShardStateActionTests extends ESTestCase {
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(1)); assertThat(capturedRequests.length, equalTo(1));
assertFalse(success.get()); assertFalse(success.get());
assertFalse(retried.get()); assertThat(retries.get(), equalTo(0));
List<Exception> possibleExceptions = new ArrayList<>(); retryLoop.accept(capturedRequests[0].requestId);
possibleExceptions.add(new NotMasterException("simulated"));
possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME));
possibleExceptions.add(new Discovery.FailedToCommitClusterStateException("simulated"));
transport.handleResponse(capturedRequests[0].requestId, randomFrom(possibleExceptions));
latch.await(); latch.await();
assertNull(exception.get()); assertNull(exception.get());
assertThat(retries.get(), equalTo(numberOfRetries));
assertTrue(success.get()); assertTrue(success.get());
} }
@ -268,23 +276,27 @@ public class ShardStateActionTests extends ESTestCase {
return shardRouting; return shardRouting;
} }
private void setUpMasterRetryVerification(AtomicBoolean retried, CountDownLatch latch) { private void setUpMasterRetryVerification(int numberOfRetries, AtomicInteger retries, CountDownLatch latch, LongConsumer retryLoop) {
shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {
DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes());
masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id());
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder));
}); });
shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(retried, latch)); shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(numberOfRetries, retries, latch, retryLoop));
} }
private void verifyRetry(AtomicBoolean retried, CountDownLatch latch) { private void verifyRetry(int numberOfRetries, AtomicInteger retries, CountDownLatch latch, LongConsumer retryLoop) {
// assert a retry request was sent // assert a retry request was sent
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
retried.set(capturedRequests.length == 1); if (capturedRequests.length == 1) {
if (retried.get()) { retries.incrementAndGet();
if (retries.get() == numberOfRetries) {
// finish the request // finish the request
transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE);
} else {
retryLoop.accept(capturedRequests[0].requestId);
}
} else { } else {
// there failed to be a retry request // there failed to be a retry request
// release the driver thread to fail the test // release the driver thread to fail the test