From f49435c78b8a9574d54997492e5305840b610c57 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 8 Jan 2016 16:39:14 -0500 Subject: [PATCH] Centrally handle channel failures when failing a shard This commit moves the handling of channel failures when failing a shard to o.e.c.a.s.ShardStateAction. This means that shard failure requests that timeout or occur when there is no master or the master leaves after the request is sent will now be retried from here. The listener for a shard failed request will now only be notified upon successful completion of the shard failed request, or when a catastrophic non-channel failure occurs. --- .../TransportReplicationAction.java | 123 ++--------- .../action/shard/ShardStateAction.java | 98 ++++++--- .../cluster/IndicesClusterStateService.java | 6 +- .../action/shard/ShardStateActionTests.java | 198 +++++++++++++----- 4 files changed, 234 insertions(+), 191 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index aef455d14b0..b261851abdb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -31,8 +31,6 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; -import org.elasticsearch.cluster.MasterNodeChangePredicate; -import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -44,7 +42,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.Tuple; @@ -67,7 +64,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.BaseTransportResponseHandler; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.EmptyTransportResponseHandler; -import org.elasticsearch.transport.ReceiveTimeoutTransportException; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannelResponseHandler; import org.elasticsearch.transport.TransportException; @@ -886,28 +882,33 @@ public abstract class TransportReplicationAction { @@ -334,10 +381,7 @@ public class ShardStateAction extends AbstractComponent { default void onSuccess() { } - default void onShardFailedNoMaster() { - } - - default void onShardFailedFailure(final DiscoveryNode master, final TransportException e) { + default void onShardFailedFailure(final Exception e) { } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9357de7b1eb..6cb30789dda 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -458,7 +458,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent {}); + shardStateAction.setOnAfterTimeout(() -> {}); + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> {}); + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> {}); } @Override @@ -84,36 +140,72 @@ public class ShardStateActionTests extends ESTestCase { THREAD_POOL = null; } - public void testNoMaster() { + public void testNoMaster() throws InterruptedException { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); - DiscoveryNodes.Builder builder = DiscoveryNodes.builder(clusterService.state().nodes()); - builder.masterNodeId(null); - clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder)); + DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + noMasterBuilder.masterNodeId(null); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(noMasterBuilder)); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + CountDownLatch latch = new CountDownLatch(1); AtomicBoolean noMaster = new AtomicBoolean(); - assert !noMaster.get(); + AtomicBoolean retried = new AtomicBoolean(); + AtomicBoolean success = new AtomicBoolean(); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + setUpMasterRetryVerification(noMaster, retried, latch); + + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedNoMaster() { - noMaster.set(true); - } - - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - + public void onSuccess() { + success.set(true); + latch.countDown(); } }); + latch.await(); + assertTrue(noMaster.get()); + assertTrue(retried.get()); + assertTrue(success.get()); } - public void testFailure() { + public void testMasterLeft() throws InterruptedException { + final String index = "test"; + + clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + + String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean noMaster = new AtomicBoolean(); + AtomicBoolean retried = new AtomicBoolean(); + AtomicBoolean success = new AtomicBoolean(); + + setUpMasterRetryVerification(noMaster, retried, latch); + + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + @Override + public void onSuccess() { + success.set(true); + latch.countDown(); + } + }); + + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); + assertThat(capturedRequests.length, equalTo(1)); + assertFalse(success.get()); + transport.handleResponse(capturedRequests[0].requestId, new NotMasterException("simulated")); + + latch.await(); + assertTrue(success.get()); + } + + public void testUnhandledFailure() { final String index = "test"; clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); @@ -121,59 +213,22 @@ public class ShardStateActionTests extends ESTestCase { String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); AtomicBoolean failure = new AtomicBoolean(); - assert !failure.get(); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { + shardStateAction.shardFailed(getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), new ShardStateAction.Listener() { @Override - public void onShardFailedNoMaster() { - - } - - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { + public void onShardFailedFailure(Exception e) { failure.set(true); } }); final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); assertThat(capturedRequests.length, equalTo(1)); - assert !failure.get(); + assertFalse(failure.get()); transport.handleResponse(capturedRequests[0].requestId, new TransportException("simulated")); assertTrue(failure.get()); } - public void testTimeout() throws InterruptedException { - final String index = "test"; - - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); - - String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); - - AtomicBoolean progress = new AtomicBoolean(); - AtomicBoolean timedOut = new AtomicBoolean(); - - TimeValue timeout = new TimeValue(1, TimeUnit.MILLISECONDS); - CountDownLatch latch = new CountDownLatch(1); - shardStateAction.shardFailed(clusterService.state(), getRandomShardRouting(index), indexUUID, "test", getSimulatedFailure(), timeout, new ShardStateAction.Listener() { - @Override - public void onShardFailedFailure(DiscoveryNode master, TransportException e) { - if (e instanceof ReceiveTimeoutTransportException) { - assertFalse(progress.get()); - timedOut.set(true); - } - latch.countDown(); - } - }); - - latch.await(); - progress.set(true); - assertTrue(timedOut.get()); - - final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests.length, equalTo(1)); - } - private ShardRouting getRandomShardRouting(String index) { IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(index); ShardsIterator shardsIterator = indexRoutingTable.randomAllActiveShardsIt(); @@ -182,6 +237,33 @@ public class ShardStateActionTests extends ESTestCase { return shardRouting; } + private void setUpMasterRetryVerification(AtomicBoolean noMaster, AtomicBoolean retried, CountDownLatch latch) { + shardStateAction.setOnBeforeWaitForNewMasterAndRetry(() -> { + DiscoveryNodes.Builder masterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); + masterBuilder.masterNodeId(clusterService.state().nodes().masterNodes().iterator().next().value.id()); + clusterService.setState(ClusterState.builder(clusterService.state()).nodes(masterBuilder)); + }); + + shardStateAction.setOnAfterWaitForNewMasterAndRetry(() -> verifyRetry(noMaster, retried, latch)); + } + + private void verifyRetry(AtomicBoolean invoked, AtomicBoolean retried, CountDownLatch latch) { + invoked.set(true); + + // assert a retry request was sent + final CapturingTransport.CapturedRequest[] capturedRequests = transport.capturedRequests(); + transport.clear(); + retried.set(capturedRequests.length == 1); + if (retried.get()) { + // finish the request + transport.handleResponse(capturedRequests[0].requestId, TransportResponse.Empty.INSTANCE); + } else { + // there failed to be a retry request + // release the driver thread to fail the test + latch.countDown(); + } + } + private Throwable getSimulatedFailure() { return new CorruptIndexException("simulated", (String) null); }