[TEST] Added test that exposes a shard consistency problem when isolated node(s) rejoin the cluster after network segmentation and when the elected master node ended up on the lesser side of the network segmentation.

This commit is contained in:
Martijn van Groningen 2014-06-11 10:06:43 +02:00 committed by Boaz Leskes
parent e7d24ecdd0
commit 4828e78637
1 changed files with 88 additions and 26 deletions

View File

@ -24,12 +24,14 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
@ -45,6 +47,7 @@ import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.junit.Ignore;
import org.junit.Test;
import java.util.List;
@ -52,7 +55,8 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
/**
@ -167,7 +171,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE")
@Ignore
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testDataConsistency() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
@ -205,25 +210,15 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
// Figure out what is the elected master node
DiscoveryNode masterDiscoNode = findMasterNode(nodes);
logger.info("---> legit elected master node=" + masterDiscoNode);
final Client masterClient = internalCluster().masterClient();
// Everything is stable now, it is now time to simulate evil...
// but first make sure we have no initializing shards and all is green
// (waiting for green here, because indexing / search in a yellow index is fine as long as no other nodes go down)
ensureGreen("test");
// Pick a node that isn't the elected master.
String isolatedNode = null;
for (String node : nodes) {
if (!node.equals(masterDiscoNode.getName())) {
isolatedNode = node;
}
}
assert isolatedNode != null;
String isolatedNode = nodes.get(0);
String nonIsolatedNode = nodes.get(1);
final Client nonIsolatedNodeClient = internalCluster().client(nonIsolatedNode);
// Simulate a network issue between the unlucky node and the rest of the cluster.
for (String nodeId : nodes) {
@ -237,7 +232,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return masterClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
return nonIsolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
@ -257,13 +252,13 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
ClusterHealthResponse healthResponse = masterClient.admin().cluster().prepareHealth("test")
ClusterHealthResponse healthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth("test")
.setWaitForYellowStatus().get();
assertThat(healthResponse.isTimedOut(), is(false));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
// Reads on the right side of the split must work
searchResponse = masterClient.prepareSearch("test").setTypes("type")
searchResponse = nonIsolatedNodeClient.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertHitCount(searchResponse, indexRequests.length);
@ -281,7 +276,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length));
// Writes on the right side of the split must work
UpdateResponse updateResponse = masterClient.prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
UpdateResponse updateResponse = nonIsolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
assertThat(updateResponse.getVersion(), equalTo(2l));
// Writes on the wrong side of the split fail
@ -289,6 +284,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
isolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2)
.setTimeout(TimeValue.timeValueSeconds(5)) // Fail quick, otherwise we wait 60 seconds.
.get();
fail();
} catch (ClusterBlockException exception) {
assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(exception.blocks().size(), equalTo(1));
@ -307,15 +303,15 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
// Wait until the master node sees all 3 nodes again.
clusterHealthResponse = masterClient.admin().cluster().prepareHealth()
clusterHealthResponse = nonIsolatedNodeClient.admin().cluster().prepareHealth()
.setWaitForGreenStatus()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertThat(clusterHealthResponse.isTimedOut(), is(false));
for (String node : nodes) {
Client client = internalCluster().client(node);
for (Client client : clients()) {
searchResponse = client.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
@ -325,13 +321,12 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
GetResponse getResponse = client.prepareGet("test", "type", "0").get();
GetResponse getResponse = client.prepareGet("test", "type", "0").setPreference("_local").get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(2l));
assertThat(getResponse.getId(), equalTo("0"));
assertThat(getResponse.getVersion(), equalTo(2l));
for (int i = 1; i < indexRequests.length; i++) {
getResponse = client.prepareGet("test", "type", String.valueOf(i)).get();
getResponse = client.prepareGet("test", "type", String.valueOf(i)).setPreference("_local").get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(String.valueOf(i)));
@ -339,6 +334,73 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
@Test
@Ignore
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
final List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
)
.get());
ensureGreen("test");
String isolatedNode = findMasterNode(nodes).getName();
String notIsolatedNode = null;
for (String node : nodes) {
if (!node.equals(isolatedNode)) {
notIsolatedNode = node;
break;
}
}
logger.info("Isolating node[" + isolatedNode + "]");
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
addFailToSendNoConnectRule(nodeId, isolatedNode);
addFailToSendNoConnectRule(isolatedNode, nodeId);
}
}
ensureYellow("test");
IndexResponse indexResponse = internalCluster().client(notIsolatedNode).prepareIndex("test", "type").setSource("field", "value").get();
assertThat(indexResponse.getVersion(), equalTo(1l));
logger.info("Verifying if document exists via node[" + notIsolatedNode + "]");
GetResponse getResponse = internalCluster().client(notIsolatedNode).prepareGet("test", "type", indexResponse.getId())
.setPreference("_local")
.get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
for (String nodeId : nodes) {
if (!nodeId.equals(isolatedNode)) {
clearNoConnectRule(nodeId, isolatedNode);
clearNoConnectRule(isolatedNode, nodeId);
}
}
ensureGreen("test");
for (String node : nodes) {
logger.info("Verifying if document exists after isolating node[" + isolatedNode + "] via node[" + node + "]");
getResponse = internalCluster().client(node).prepareGet("test", "type", indexResponse.getId())
.setPreference("_local")
.get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(indexResponse.getId()));
}
}
private DiscoveryNode findMasterNode(List<String> nodes) {
DiscoveryNode masterDiscoNode = null;
for (String node : nodes) {