[TEST] Verify no master block during partition for read and write apis

This commit is contained in:
Martijn van Groningen 2014-06-30 19:03:24 +02:00 committed by Boaz Leskes
parent 98084c02ce
commit 52f69c64f7
2 changed files with 87 additions and 91 deletions

View File

@ -59,6 +59,7 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(DisableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_DISABLE_REPLICA_ALLOCATION);
clusterDynamicSettings.addDynamicSetting(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, Validator.INTEGER);
clusterDynamicSettings.addDynamicSetting(ZenDiscovery.REJOIN_ON_MASTER_GONE, Validator.BOOLEAN);
clusterDynamicSettings.addDynamicSetting(DiscoverySettings.NO_MASTER_BLOCK);
clusterDynamicSettings.addDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_INCLUDE_GROUP + "*");
clusterDynamicSettings.addDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_EXCLUDE_GROUP + "*");
clusterDynamicSettings.addDynamicSetting(FilterAllocationDecider.CLUSTER_ROUTING_REQUIRE_GROUP + "*");

View File

@ -22,12 +22,10 @@ package org.elasticsearch.discovery;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
@ -42,8 +40,6 @@ import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.*;
@ -64,8 +60,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
*/
@ -161,39 +157,11 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testDataConsistency() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
public void testVerifyApiBlocksDuringPartition() throws Exception {
internalCluster().startNodesAsync(3, nodeSettings).get();
// Wait until a 3 nodes are part of the cluster
ensureStableCluster(3);
assertAcked(prepareCreate("test")
.addMapping("type", "field", "type=long")
.get());
IndexRequestBuilder[] indexRequests = new IndexRequestBuilder[scaledRandomIntBetween(1, 1000)];
for (int i = 0; i < indexRequests.length; i++) {
indexRequests[i] = client().prepareIndex("test", "type", String.valueOf(i)).setSource("field", i);
}
indexRandom(true, indexRequests);
for (int i = 0; i < indexRequests.length; i++) {
GetResponse getResponse = client().prepareGet("test", "type", String.valueOf(i)).get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(String.valueOf(i)));
}
SearchResponse searchResponse = client().prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
assertHitCount(searchResponse, indexRequests.length);
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
createIndex("test");
// Everything is stable now, it is now time to simulate evil...
// but first make sure we have no initializing shards and all is green
@ -233,35 +201,91 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat(applied, is(true));
ensureStableCluster(2, nonIsolatedNode);
// Reads on the right side of the split must work
logger.info("verifying healthy part of cluster returns data");
searchResponse = client(nonIsolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
// Reads on the wrong side of the split are allowed
client(isolatedNode).prepareSearch("test").setTypes("type")
.setPreference("_only_local")
.get();
assertHitCount(searchResponse, indexRequests.length);
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
// Reads on the wrong side of the split are partial
logger.info("verifying isolated node [{}] returns partial data", isolatedNode);
searchResponse = client(isolatedNode).prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC).setPreference("_only_local")
client(isolatedNode).preparePercolate().setDocumentType("type").setIndices("test")
.setPreference("_only_local").setSource("{\"doc\" : {}}")
.get();
client(isolatedNode).prepareCount("test").setTypes("type")
.setPreference("_only_local")
.get();
client(isolatedNode).prepareGet("test", "type", "0").setPreference("_only_local")
.get();
assertThat(searchResponse.getSuccessfulShards(), lessThan(searchResponse.getTotalShards()));
assertThat(searchResponse.getHits().totalHits(), lessThan((long) indexRequests.length));
logger.info("verifying writes on healthy cluster");
UpdateResponse updateResponse = client(nonIsolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2).get();
assertThat(updateResponse.getVersion(), equalTo(2l));
// Writes on the wrong side of the split are *not* allowed
executeBlockedApi(
client(isolatedNode).prepareIndex("test", "type", "0").setSource("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
executeBlockedApi(
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
networkPartition.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3, new TimeValue(30000 + networkPartition.expectedTimeToHeal().millis()));
logger.info("Verify no master block with {} set to {}", DiscoverySettings.NO_MASTER_BLOCK, "all");
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(ImmutableSettings.builder().put(DiscoverySettings.NO_MASTER_BLOCK, "all"))
.get();
networkPartition.startDisrupting();
logger.info("wait until elected master has removed [{}]", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
// It may a take a bit before the node detects it has been cut off from the elected master
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
ensureStableCluster(2, nonIsolatedNode);
// Now reads and writes on the wrong side of the split are allowed
executeBlockedApi(
client(isolatedNode).prepareSearch("test").setTypes("type").setPreference("_only_local")
);
executeBlockedApi(
client(isolatedNode).preparePercolate().setDocumentType("type").setIndices("test").setPreference("_only_local").setSource("{\"doc\" : {}}")
);
executeBlockedApi(
client(isolatedNode).prepareCount("test").setTypes("type").setPreference("_only_local")
);
executeBlockedApi(
client(isolatedNode).prepareGet("test", "type", "0").setPreference("_only_local")
);
executeBlockedApi(
client(isolatedNode).prepareIndex("test", "type", "0").setSource("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
executeBlockedApi(
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
networkPartition.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3, new TimeValue(30000 + networkPartition.expectedTimeToHeal().millis()));
}
private void executeBlockedApi(ActionRequestBuilder builder) {
try {
logger.info("verifying writes on isolated [{}] fail", isolatedNode);
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("field2", 2)
.setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
.get();
logger.info("verifying request[{}] on isolated [{}] and fail", builder.getClass().getSimpleName());
builder.get();
fail();
} catch (ClusterBlockException exception) {
assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
@ -269,37 +293,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
ClusterBlock clusterBlock = exception.blocks().iterator().next();
assertThat(clusterBlock.id(), equalTo(DiscoverySettings.NO_MASTER_BLOCK_ID));
}
networkPartition.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3, new TimeValue(30000 + networkPartition.expectedTimeToHeal().millis()));
logger.info("verifying all nodes return all data");
for (Client client : clients()) {
searchResponse = client.prepareSearch("test").setTypes("type")
.addSort("field", SortOrder.ASC)
.get();
for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
SearchHit searchHit = searchResponse.getHits().getAt(i);
assertThat(searchHit.id(), equalTo(String.valueOf(i)));
assertThat((long) searchHit.sortValues()[0], equalTo((long) i));
}
GetResponse getResponse = client.prepareGet("test", "type", "0").setPreference("_local").get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getId(), equalTo("0"));
assertThat(getResponse.getVersion(), equalTo(2l));
for (int i = 1; i < indexRequests.length; i++) {
getResponse = client.prepareGet("test", "type", String.valueOf(i)).setPreference("_local").get();
assertThat(getResponse.isExists(), is(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(getResponse.getId(), equalTo(String.valueOf(i)));
}
}
}
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception {