From 107534c06236cef36e3292a573e9f2ae4b1853e1 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Thu, 10 Jul 2014 15:06:53 +0200 Subject: [PATCH] Do not ignore ConnectTransportException for shard replication operations A ConnectTransportException should fail the replica shard Closes #6183 --- ...nsportShardReplicationOperationAction.java | 5 +- .../index/TransportIndexFailuresTest.java | 162 ++++++++++++++++++ .../test/transport/MockTransportService.java | 28 +++ 3 files changed, 191 insertions(+), 4 deletions(-) create mode 100644 src/test/java/org/elasticsearch/index/TransportIndexFailuresTest.java diff --git a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 14c0c6bdf0c..41e842dfd83 100644 --- a/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -170,9 +170,6 @@ public abstract class TransportShardReplicationOperationAction nodes = internalCluster().startNodesAsync(2, nodeSettings).get(); + + // Create index test with 1 shard, 1 replica and ensure it is green + createIndex(INDEX); + ensureGreen(INDEX); + + // Disable allocation so the replica cannot be reallocated when it fails + Settings s = ImmutableSettings.builder().put("cluster.routing.allocation.enable", "none").build(); + client().admin().cluster().prepareUpdateSettings().setTransientSettings(s).get(); + + // Determine which node holds the primary shard + ClusterState state = getNodeClusterState(nodes.get(0)); + IndexShardRoutingTable shard = state.getRoutingTable().index(INDEX).shard(0); + String primaryNode; + String replicaNode; + if (shard.getShards().get(0).primary()) { + primaryNode = nodes.get(0); + replicaNode = nodes.get(1); + } else { + primaryNode = nodes.get(1); + replicaNode = nodes.get(0); + } + logger.info("--> primary shard is on {}", primaryNode); + + // Index a document to make sure everything works well + IndexResponse resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "bar").get(); + assertThat("document exists on primary node", + internalCluster().client(primaryNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), + equalTo(true)); + assertThat("document exists on replica node", + internalCluster().client(replicaNode).prepareGet(INDEX, "doc", resp.getId()).setPreference("_only_local").get().isExists(), + equalTo(true)); + + // Disrupt the network so indexing requests fail to replicate + logger.info("--> preventing index/replica operations"); + TransportService mockTransportService = internalCluster().getInstance(TransportService.class, primaryNode); + ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( + internalCluster().getInstance(Discovery.class, replicaNode).localNode(), + ImmutableSet.of("index/replica") + ); + mockTransportService = internalCluster().getInstance(TransportService.class, replicaNode); + ((MockTransportService) mockTransportService).addFailToSendNoConnectRule( + internalCluster().getInstance(Discovery.class, primaryNode).localNode(), + ImmutableSet.of("index/replica") + ); + + logger.info("--> indexing into primary"); + // the replica shard should now be marked as failed because the replication operation will fail + resp = internalCluster().client(primaryNode).prepareIndex(INDEX, "doc").setSource("foo", "baz").get(); + // wait until the cluster reaches an exact yellow state, meaning replica has failed + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)); + } + }); + assertThat("document should still be indexed and available", + client().prepareGet(INDEX, "doc", resp.getId()).get().isExists(), equalTo(true)); + + state = getNodeClusterState(randomFrom(nodes.toArray(Strings.EMPTY_ARRAY))); + RoutingNodes rn = state.routingNodes(); + logger.info("--> counts: total: {}, unassigned: {}, initializing: {}, relocating: {}, started: {}", + rn.shards(new Predicate() { + @Override + public boolean apply(org.elasticsearch.cluster.routing.MutableShardRouting input) { + return true; + } + }).size(), + rn.shardsWithState(UNASSIGNED).size(), + rn.shardsWithState(INITIALIZING).size(), + rn.shardsWithState(RELOCATING).size(), + rn.shardsWithState(STARTED).size()); + logger.info("--> unassigned: {}, initializing: {}, relocating: {}, started: {}", + rn.shardsWithState(UNASSIGNED), + rn.shardsWithState(INITIALIZING), + rn.shardsWithState(RELOCATING), + rn.shardsWithState(STARTED)); + + assertThat("only a single shard is now active (replica should be failed and not reallocated)", + rn.shardsWithState(STARTED).size(), equalTo(1)); + } + + private ClusterState getNodeClusterState(String node) { + return internalCluster().client(node).admin().cluster().prepareState().setLocal(true).get().getState(); + } +} diff --git a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java index e41af988596..e6ba6f05213 100644 --- a/src/test/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/src/test/java/org/elasticsearch/test/transport/MockTransportService.java @@ -32,6 +32,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; +import java.util.Set; import java.util.concurrent.ConcurrentMap; /** @@ -91,6 +92,33 @@ public class MockTransportService extends TransportService { }); } + /** + * Adds a rule that will cause matching operations to throw ConnectTransportExceptions + */ + public void addFailToSendNoConnectRule(DiscoveryNode node, final Set blockedActions) { + + ((LookupTestTransport) transport).transports.put(node, new DelegateTransport(original) { + @Override + public void connectToNode(DiscoveryNode node) throws ConnectTransportException { + original.connectToNode(node); + } + + @Override + public void connectToNodeLight(DiscoveryNode node) throws ConnectTransportException { + original.connectToNodeLight(node); + } + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + if (blockedActions.contains(action)) { + logger.info("--> preventing {} request", action); + throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request"); + } + original.sendRequest(node, requestId, action, request, options); + } + }); + } + /** * Adds a rule that will cause ignores each send request, simulating an unresponsive node * and failing to connect once the rule was added.