diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index aef455d14b0..b261851abdb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -31,8 +31,6 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.MasterNodeChangePredicate; -import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -44,7 +42,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; @@ -67,7 +64,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; @@ -886,28 +882,33 @@ public abstract class TransportReplicationAction { @@ -334,10 +381,7 @@ public class ShardStateAction extends AbstractComponent { default void onSuccess() { } - default void onShardFailedNoMaster() { - } - - default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) { + default void onShardFailedFailure(final Exception e) { } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9357de7b1eb..6cb30789dda 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -458,7 +458,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent {}); + shardStateAction.setOnAfterTimeout(() -> {}); + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {}); + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {}); } @Override @@ -84,36 +140,72 @@ public class ShardStateActionTests extends ESTestCase { THREAD_POOL = null; } - public void testNoMaster() { + public void testNoMaster() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().nodes()); - builder.masterNodeId(null); - clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder)); + DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + noMasterBuilder.masterNodeId(null); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(noMasterBuilder)); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + CountDownLatch latch = new CountDownLatch(1); AtomicBoolean noMaster = new AtomicBoolean(); - assert !noMaster.get(); + AtomicBoolean retried = new AtomicBoolean(); + AtomicBoolean success = new AtomicBoolean(); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + setUpMasterRetryVerification(noMaster, retried, latch); + + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedNoMaster() { - noMaster.set(true); - } - - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - + public void onSuccess() { + success.set(true); + latch.countDown(); } }); + latch.await(); + assertTrue(noMaster.get()); + assertTrue(retried.get()); + assertTrue(success.get()); } - public void testFailure() { + public void testMasterLeft() throws InterruptedException { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean noMaster = new AtomicBoolean(); + AtomicBoolean retried = new AtomicBoolean(); + AtomicBoolean success = new AtomicBoolean(); + + setUpMasterRetryVerification(noMaster, retried, latch); + + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + }); + + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); + assertThat(capturedRequests.length, equalTo(1)); + assertFalse(success.get()); + transport.handleResponse(capturedRequests[0].requestId, new NotMasterException("simulated")); + + latch.await(); + assertTrue(success.get()); + } + + public void testUnhandledFailure() { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); @@ -121,59 +213,22 @@ public class ShardStateActionTests extends ESTestCase { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); AtomicBoolean failure = new AtomicBoolean(); - assert !failure.get(); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedNoMaster() { - - } - - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { + public void onShardFailedFailure(Exception e) { failure.set(true); } }); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); - assert !failure.get(); + assertFalse(failure.get()); transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated")); assertTrue(failure.get()); } - public void testTimeout() throws InterruptedException { - final String index = "test"; - - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); - - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - - AtomicBoolean progress = new AtomicBoolean(); - AtomicBoolean timedOut = new AtomicBoolean(); - - TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS); - CountDownLatch latch = new CountDownLatch(1); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() { - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - if (e instanceof ReceiveTimeoutTransportException) { - assertFalse(progress.get()); - timedOut.set(true); - } - latch.countDown(); - } - }); - - latch.await(); - progress.set(true); - assertTrue(timedOut.get()); - - final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests.length, equalTo(1)); - } - private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt(); @@ -182,6 +237,33 @@ public class ShardStateActionTests extends ESTestCase { return shardRouting; } + private void setUpMasterRetryVerification(AtomicBoolean noMaster, AtomicBoolean retried, CountDownLatch latch) { + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { + DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); + }); + + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(noMaster, retried, latch)); + } + + private void verifyRetry(AtomicBoolean invoked, AtomicBoolean retried, CountDownLatch latch) { + invoked.set(true); + + // assert a retry request was sent + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); + retried.set(capturedRequests.length == 1); + if (retried.get()) { + // finish the request + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + } else { + // there failed to be a retry request + // release the driver thread to fail the test + latch.countDown(); + } + } + private Throwable getSimulatedFailure() { return new CorruptIndexException("simulated", (String) null); }