[TEST] Added test that verifies data integrity during and after a simulated network split.

This commit is contained in:
Martijn van Groningen 2014-06-06 12:09:11 +02:00 committed by Boaz Leskes
parent 2c9ef63676
commit fc8ae4d30d

View File

@ -21,13 +21,25 @@ package org.elasticsearch.discovery;
import com.google.common.base.Predicate;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
@ -40,26 +52,46 @@ import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.*;
/**
*/
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest {
private static final Settings nodeSettings = ImmutableSettings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put("discovery.zen.minimum_master_nodes", 2)
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build();
@Override
protected int numberOfShards() {
return 3;
}
@Override
protected int numberOfReplicas() {
return 1;
}
@Override
public Settings indexSettings() {
Settings settings = super.indexSettings();
return ImmutableSettings.builder()
.put(settings)
.put(ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE, 2)
.build();
}
@Test
@TestLogging("discovery.zen:TRACE")
public void failWithMinimumMasterNodesConfigured() throws Exception {
final Settings settings = ImmutableSettings.settingsBuilder()
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put("discovery.zen.minimum_master_nodes", 2)
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build();
List<String> nodes = internalCluster().startNodesAsync(3, settings).get();
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
// Wait until a green status has been reaches and 3 nodes are part of the cluster
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
@ -145,6 +177,185 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE")
public void testDataConsistency() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
// Wait until a green status has been reaches and 3 nodes are part of the cluster
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
assertAcked(prepareCreate("test")
.addMapping("type", "field", "type=long")
.get());
IndexRequestBuilder[] indexRequests = new IndexRequestBuilder[1 + randomInt(1000)];
for (int i = 0; i < indexRequests.length; i++) {
indexRequests[i] = client().prepareIndex("test", "type", String.valueOf(i)).setSource("field", i);
}
indexRandom(true, indexRequests);
for (int i = 0; i < indexRequests.length; i++) {
GetResponse getResponse = client().prepareGet("test", "type", String.valueOf(i)).get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(String.valueOf(i)));
}
SearchResponse searchResponse = client().prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertHitCount(searchResponse, indexRequests.length);
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
// Figure out what is the elected master node
DiscoveryNode masterDiscoNode = null;
for (String node : nodes) {
ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(3));
if (masterDiscoNode == null) {
masterDiscoNode = state.nodes().masterNode();
} else {
assertThat(state.nodes().masterNode(), equalTo(masterDiscoNode));
}
}
assert masterDiscoNode != null;
logger.info("---> legit elected master node=" + masterDiscoNode);
final Client masterClient = internalCluster().masterClient();
// Everything is stable now, it is now time to simulate evil...
// Pick a node that isn't the elected master.
String unluckyNode = null;
for (String node : nodes) {
if (!node.equals(masterDiscoNode.getName())) {
unluckyNode = node;
}
}
assert unluckyNode != null;
// Simulate a network issue between the unlucky node and the rest of the cluster.
for (String nodeId : nodes) {
if (!nodeId.equals(unluckyNode)) {
addFailToSendNoConnectRule(nodeId, unluckyNode);
addFailToSendNoConnectRule(unluckyNode, nodeId);
}
}
try {
// Wait until elected master has removed that the unlucky node...
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return masterClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
final Client isolatedNodeClient = internalCluster().client(unluckyNode);
// It may a take a bit before the node detects it has been cut off from the elected master
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
ClusterHealthResponse healthResponse = masterClient.admin().cluster().prepareHealth("test")
.setWaitForYellowStatus().get();
assertThat(healthResponse.isTimedOut(), is(false));
assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
// Reads on the right side of the split must work
searchResponse = masterClient.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertHitCount(searchResponse, indexRequests.length);
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
// Reads on the wrong side of the split are partial
searchResponse = isolatedNodeClient.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards()));
assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length));
// Writes on the right side of the split must work
UpdateResponse updateResponse = masterClient.prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
assertThat(updateResponse.getVersion(), equalTo(2l));
// Writes on the wrong side of the split fail
try {
isolatedNodeClient.prepareUpdate("test", "type", "0").setDoc("field2", 2)
.setTimeout(TimeValue.timeValueSeconds(5)) // Fail quick, otherwise we wait 60 seconds.
.get();
} catch (ClusterBlockException exception) {
assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(exception.blocks().size(), equalTo(1));
ClusterBlock clusterBlock = exception.blocks().iterator().next();
assertThat(clusterBlock.id(), equalTo(DiscoverySettings.NO_MASTER_BLOCK_ID));
}
} finally {
// stop simulating network failures, from this point on the unlucky node is able to rejoin
// We also need to do this even if assertions fail, since otherwise the test framework can't work properly
for (String nodeId : nodes) {
if (!nodeId.equals(unluckyNode)) {
clearNoConnectRule(nodeId, unluckyNode);
clearNoConnectRule(unluckyNode, nodeId);
}
}
}
// Wait until the master node sees all 3 nodes again.
clusterHealthResponse = masterClient.admin().cluster().prepareHealth()
.setWaitForGreenStatus()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.get();
assertThat(clusterHealthResponse.isTimedOut(), is(false));
for (String node : nodes) {
Client client = internalCluster().client(node);
searchResponse = client.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
GetResponse getResponse = client().prepareGet("test", "type", "0").get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(2l));
assertThat(getResponse.getId(), equalTo("0"));
for (int i = 1; i < indexRequests.length; i++) {
getResponse = client().prepareGet("test", "type", String.valueOf(i)).get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(String.valueOf(i)));
}
}
}
private void addFailToSendNoConnectRule(String fromNode, String toNode) {
TransportService mockTransportService = internalCluster().getInstance(TransportService.class, fromNode);
((MockTransportService) mockTransportService).addFailToSendNoConnectRule(internalCluster().getInstance(Discovery.class, toNode).localNode());