diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index e9db5e404c7..6cbaa3d9055 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -54,6 +54,8 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.is; public class ShardStateActionTests extends ESTestCase { private static ThreadPool THREAD_POOL; @@ -119,6 +121,41 @@ public class ShardStateActionTests extends ESTestCase { THREAD_POOL = null; } + public void testSuccess() throws InterruptedException { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + AtomicBoolean success = new AtomicBoolean(); + CountDownLatch latch = new CountDownLatch(1); + + ShardRouting shardRouting = getRandomShardRouting(index); + shardStateAction.shardFailed(shardRouting, indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + }); + + CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); + assertEquals(1, capturedRequests.length); + // the request is a shard failed request + assertThat(capturedRequests[0].request, is(instanceOf(ShardStateAction.ShardRoutingEntry.class))); + ShardStateAction.ShardRoutingEntry shardRoutingEntry = (ShardStateAction.ShardRoutingEntry)capturedRequests[0].request; + // for the right shard + assertEquals(shardRouting, shardRoutingEntry.getShardRouting()); + // sent to the master + assertEquals(clusterService.state().nodes().masterNode().getId(), capturedRequests[0].node.getId()); + + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + + latch.await(); + assertTrue(success.get()); + } + public void testNoMaster() throws InterruptedException { final String index = "test"; @@ -131,11 +168,10 @@ public class ShardStateActionTests extends ESTestCase { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean noMaster = new AtomicBoolean(); AtomicBoolean retried = new AtomicBoolean(); AtomicBoolean success = new AtomicBoolean(); - setUpMasterRetryVerification(noMaster, retried, latch); + setUpMasterRetryVerification(retried, latch); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override @@ -147,7 +183,6 @@ public class ShardStateActionTests extends ESTestCase { latch.await(); - assertTrue(noMaster.get()); assertTrue(retried.get()); assertTrue(success.get()); } @@ -160,12 +195,11 @@ public class ShardStateActionTests extends ESTestCase { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean noMaster = new AtomicBoolean(); AtomicBoolean retried = new AtomicBoolean(); AtomicBoolean success = new AtomicBoolean(); AtomicReference exception = new AtomicReference<>(); - setUpMasterRetryVerification(noMaster, retried, latch); + setUpMasterRetryVerification(retried, latch); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override @@ -185,6 +219,7 @@ public class ShardStateActionTests extends ESTestCase { final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); assertFalse(success.get()); + assertFalse(retried.get()); List possibleExceptions = new ArrayList<>(); possibleExceptions.add(new NotMasterException("simulated")); possibleExceptions.add(new NodeDisconnectedException(clusterService.state().nodes().masterNode(), ShardStateAction.SHARD_FAILED_ACTION_NAME)); @@ -206,6 +241,11 @@ public class ShardStateActionTests extends ESTestCase { AtomicBoolean failure = new AtomicBoolean(); shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + assert false; + } + @Override public void onShardFailedFailure(Exception e) { failure.set(true); @@ -228,19 +268,17 @@ public class ShardStateActionTests extends ESTestCase { return shardRouting; } - private void setUpMasterRetryVerification(AtomicBoolean noMaster, AtomicBoolean retried, CountDownLatch latch) { + private void setUpMasterRetryVerification(AtomicBoolean retried, CountDownLatch latch) { shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); }); - shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(noMaster, retried, latch)); + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(retried, latch)); } - private void verifyRetry(AtomicBoolean invoked, AtomicBoolean retried, CountDownLatch latch) { - invoked.set(true); - + private void verifyRetry(AtomicBoolean retried, CountDownLatch latch) { // assert a retry request was sent final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); retried.set(capturedRequests.length == 1);