Added java docs to all tests in DiscoveryWithNetworkFailuresTests

Moved testVerifyApiBlocksDuringPartition to test blocks rather then rely on specific API rejections.
Did some cleaning while at it.
This commit is contained in:
Boaz Leskes 2014-07-02 12:00:49 +02:00
parent 77dae631e1
commit 5e5f8a9daf
5 changed files with 119 additions and 158 deletions

View File

@ -117,6 +117,10 @@ public class ClusterBlocks {
return false;
}
public boolean hasGlobalBlock(ClusterBlockLevel level) {
return !global(level).isEmpty();
}
/**
* Is there a global block with the provided status?
*/

View File

@ -43,8 +43,8 @@ public class DiscoverySettings extends AbstractComponent {
public static final String DEFAULT_NO_MASTER_BLOCK = "write";
public final static int NO_MASTER_BLOCK_ID = 2;
private final static ClusterBlock ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
private final static ClusterBlock WRITE = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA));
public final static ClusterBlock NO_MASTER_BLOCK_ALL = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL);
public final static ClusterBlock NO_MASTER_BLOCK_WRITES = new ClusterBlock(NO_MASTER_BLOCK_ID, "no master", true, false, RestStatus.SERVICE_UNAVAILABLE, EnumSet.of(ClusterBlockLevel.WRITE, ClusterBlockLevel.METADATA));
private volatile ClusterBlock noMasterBlock;
private volatile TimeValue publishTimeout = DEFAULT_PUBLISH_TIMEOUT;
@ -90,9 +90,9 @@ public class DiscoverySettings extends AbstractComponent {
private ClusterBlock parseNoMasterBlock(String value) {
if ("all".equals(value)) {
return ALL;
return NO_MASTER_BLOCK_ALL;
} else if ("write".equals(value)) {
return WRITE;
return NO_MASTER_BLOCK_WRITES;
} else {
throw new ElasticsearchIllegalArgumentException("invalid master block [" + value + "]");
}

View File

@ -22,24 +22,21 @@ package org.elasticsearch.discovery;
import com.google.common.base.Predicate;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.operation.hash.djb.DjbHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.*;
@ -66,6 +63,7 @@ import static org.hamcrest.Matchers.is;
/**
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
@TestLogging("discovery.zen:TRACE")
public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationTest {
private static final Settings nodeSettings = ImmutableSettings.settingsBuilder()
@ -87,8 +85,12 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
return 1;
}
/**
* Test that no split brain occurs under partial network partition. See https://github.com/elasticsearch/elasticsearch/issues/2488
*
* @throws Exception
*/
@Test
@TestLogging("discovery.zen:TRACE")
public void failWithMinimumMasterNodesConfigured() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get();
@ -97,50 +99,37 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
ensureStableCluster(3);
// Figure out what is the elected master node
DiscoveryNode masterDiscoNode = findMasterNode(nodes);
logger.info("---> legit elected master node=" + masterDiscoNode);
final Client masterClient = internalCluster().masterClient();
// Everything is stable now, it is now time to simulate evil...
final String masterNode = internalCluster().getMasterName();
logger.info("---> legit elected master node=" + masterNode);
// Pick a node that isn't the elected master.
String unluckyNode = null;
for (String node : nodes) {
if (!node.equals(masterDiscoNode.getName())) {
unluckyNode = node;
}
}
assert unluckyNode != null;
Set<String> nonMasters = new HashSet<>(nodes);
nonMasters.remove(masterNode);
final String unluckyNode = randomFrom(nonMasters.toArray(Strings.EMPTY_ARRAY));
// Simulate a network issue between the unlucky node and elected master node in both directions.
NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterDiscoNode.name(), unluckyNode, getRandom());
NetworkDisconnectPartition networkDisconnect = new NetworkDisconnectPartition(masterNode, unluckyNode, getRandom());
setDisruptionScheme(networkDisconnect);
networkDisconnect.startDisrupting();
// Wait until elected master has removed that the unlucky node...
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return masterClient.admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
ensureStableCluster(2, masterNode);
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
final Client isolatedNodeClient = internalCluster().client(unluckyNode);
// It may a take a bit before the node detects it has been cut off from the elected master
applied = awaitBusy(new Predicate<Object>() {
boolean success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = isolatedNodeClient.admin().cluster().prepareState().setLocal(true).get().getState();
ClusterState localClusterState = getNodeClusterState(unluckyNode);
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
assertThat(success, is(true));
networkDisconnect.stopDisrupting();
@ -148,14 +137,17 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
ensureStableCluster(3);
for (String node : nodes) {
ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
ClusterState state = getNodeClusterState(node);
assertThat(state.nodes().size(), equalTo(3));
// The elected master shouldn't have changed, since the unlucky node never could have elected himself as
// master since m_m_n of 2 could never be satisfied.
assertThat(state.nodes().masterNode(), equalTo(masterDiscoNode));
assertThat(state.nodes().masterNode().name(), equalTo(masterNode));
}
}
/**
* Verify that the proper block is applied when nodes loose their master
*/
@Test
public void testVerifyApiBlocksDuringPartition() throws Exception {
internalCluster().startNodesAsync(3, nodeSettings).get();
@ -164,8 +156,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
// Makes sure that the get request can be executed on each node locally:
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
));
// Everything is stable now, it is now time to simulate evil...
@ -176,56 +168,58 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
NetworkPartition networkPartition = addRandomPartition();
final String isolatedNode = networkPartition.getMinoritySide().get(0);
final String nonIsolatedNode = networkPartition.getMjaoritySide().get(0);
final String nonIsolatedNode = networkPartition.getMajoritySide().get(0);
// Simulate a network issue between the unlucky node and the rest of the cluster.
networkPartition.startDisrupting();
logger.info("wait until elected master has removed [{}]", isolatedNode);
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
// It may a take a bit before the node detects it has been cut off from the elected master
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
final ClusterState[] lastState = new ClusterState[1];
boolean success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
lastState[0] = getNodeClusterState(isolatedNode);
DiscoveryNodes localDiscoveryNodes = lastState[0].nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
if (localDiscoveryNodes.masterNode() == null) {
return false;
}
for (ClusterBlockLevel level : DiscoverySettings.NO_MASTER_BLOCK_WRITES.levels()) {
if (lastState[0].getBlocks().hasGlobalBlock(level)) {
return false;
}
}
return true;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
if (!success) {
fail("isolated node still has a master or the wrong blocks. Cluster state:\n" + lastState[0].prettyPrint());
}
logger.info("wait until elected master has been removed and a new 2 node cluster was from (via [{}])", isolatedNode);
ensureStableCluster(2, nonIsolatedNode);
// Reads on the wrong side of the split are allowed
client(isolatedNode).prepareSearch("test").setTypes("type")
.setPreference("_only_local")
.get();
client(isolatedNode).preparePercolate().setDocumentType("type").setIndices("test")
.setPreference("_only_local").setSource("{\"doc\" : {}}")
.get();
client(isolatedNode).prepareCount("test").setTypes("type")
.setPreference("_only_local")
.get();
client(isolatedNode).prepareGet("test", "type", "0").setPreference("_only_local")
.get();
for (String node : networkPartition.getMajoritySide()) {
ClusterState nodeState = getNodeClusterState(node);
success = true;
if (nodeState.nodes().getMasterNode() == null) {
success = false;
}
if (!nodeState.blocks().global().isEmpty()) {
success = false;
}
if (!success) {
fail("node [" + node + "] has no master or has blocks, despite of being on the right side of the partition. State dump:\n"
+ nodeState.prettyPrint());
}
}
// Writes on the wrong side of the split are *not* allowed
executeBlockedApi(
client(isolatedNode).prepareIndex("test", "type", "0").setSource("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
executeBlockedApi(
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
networkPartition.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
@ -237,69 +231,37 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
.get();
networkPartition.startDisrupting();
logger.info("wait until elected master has removed [{}]", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return client(nonIsolatedNode).admin().cluster().prepareState().setLocal(true).get().getState().nodes().size() == 2;
}
}, 1, TimeUnit.MINUTES);
assertThat(applied, is(true));
// The unlucky node must report *no* master node, since it can't connect to master and in fact it should
// continuously ping until network failures have been resolved. However
// It may a take a bit before the node detects it has been cut off from the elected master
logger.info("waiting for isolated node [{}] to have no master", isolatedNode);
applied = awaitBusy(new Predicate<Object>() {
success = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
ClusterState localClusterState = client(isolatedNode).admin().cluster().prepareState().setLocal(true).get().getState();
DiscoveryNodes localDiscoveryNodes = localClusterState.nodes();
lastState[0] = getNodeClusterState(isolatedNode);
DiscoveryNodes localDiscoveryNodes = lastState[0].nodes();
logger.info("localDiscoveryNodes=" + localDiscoveryNodes.prettyPrint());
return localDiscoveryNodes.masterNode() == null;
if (localDiscoveryNodes.masterNode() == null) {
return false;
}
for (ClusterBlockLevel level : DiscoverySettings.NO_MASTER_BLOCK_ALL.levels()) {
if (lastState[0].getBlocks().hasGlobalBlock(level)) {
return false;
}
}
return true;
}
}, 10, TimeUnit.SECONDS);
assertThat(applied, is(true));
ensureStableCluster(2, nonIsolatedNode);
// Now reads and writes on the wrong side of the split are allowed
executeBlockedApi(
client(isolatedNode).prepareSearch("test").setTypes("type").setPreference("_only_local")
);
executeBlockedApi(
client(isolatedNode).preparePercolate().setDocumentType("type").setIndices("test").setPreference("_only_local").setSource("{\"doc\" : {}}")
);
executeBlockedApi(
client(isolatedNode).prepareCount("test").setTypes("type").setPreference("_only_local")
);
executeBlockedApi(
client(isolatedNode).prepareGet("test", "type", "0").setPreference("_only_local")
);
executeBlockedApi(
client(isolatedNode).prepareIndex("test", "type", "0").setSource("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
executeBlockedApi(
client(isolatedNode).prepareUpdate("test", "type", "0").setDoc("{}").setTimeout("1s") // Fail quick, otherwise we wait 60 seconds.
);
networkPartition.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3, new TimeValue(30000 + networkPartition.expectedTimeToHeal().millis()));
}
private void executeBlockedApi(ActionRequestBuilder builder) {
try {
logger.info("verifying request[{}] on isolated [{}] and fail", builder.getClass().getSimpleName());
builder.get();
fail();
} catch (ClusterBlockException exception) {
assertThat(exception.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
assertThat(exception.blocks().size(), equalTo(1));
ClusterBlock clusterBlock = exception.blocks().iterator().next();
assertThat(clusterBlock.id(), equalTo(DiscoverySettings.NO_MASTER_BLOCK_ID));
if (!success) {
fail("isolated node still has a master or the wrong blocks (expected 'all' block). Cluster state:\n" + lastState[0].prettyPrint());
}
}
/**
* This test isolates the master from rest of the cluster, waits for a new master to be elected, restores the partition
* and verifies that all node agree on the new cluster state
*/
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception {
@ -313,24 +275,19 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
));
ensureGreen();
String isolatedNode = findMasterNode(nodes).name();
String nonIsolatedNode = null;
for (String node : nodes) {
if (!node.equals(isolatedNode)) {
nonIsolatedNode = node;
break;
}
}
ServiceDisruptionScheme scheme = addRandomIsolation(isolatedNode);
scheme.startDisrupting();
String isolatedNode = internalCluster().getMasterName();
NetworkPartition networkPartition = addRandomIsolation(isolatedNode);
networkPartition.startDisrupting();
String nonIsolatedNode = networkPartition.getMajoritySide().get(0);
// make sure cluster reforms
ensureStableCluster(2, nonIsolatedNode);
// restore isolation
scheme.stopDisrupting();
networkPartition.stopDisrupting();
ensureStableCluster(3, new TimeValue(30000 + scheme.expectedTimeToHeal().millis()));
ensureStableCluster(3, new TimeValue(30000 + networkPartition.expectedTimeToHeal().millis()));
logger.info("issue a reroute");
// trigger a reroute now, instead of waiting for the background reroute of RerouteService
@ -341,7 +298,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
// verify all cluster states are the same
ClusterState state = null;
for (String node : nodes) {
ClusterState nodeState = client(node).admin().cluster().prepareState().setLocal(true).get().getState();
ClusterState nodeState = getNodeClusterState(node);
if (state == null) {
state = nodeState;
continue;
@ -364,6 +321,10 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
/**
* Test the we do not loose document whose indexing request was successful, under a randomly selected disruption scheme
* We also collect & report the type of indexing failures that occur.
*/
@Test
@LuceneTestCase.AwaitsFix(bugUrl = "needs some more work to stabilize")
@TestLogging("action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
@ -497,6 +458,12 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
}
/**
* Test that a document which is indexed on the majority side of a partition, is available from the minory side,
* once the partition is healed
*
* @throws Exception
*/
@Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
@ -592,22 +559,6 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
return list.get(0);
}
private DiscoveryNode findMasterNode(List<String> nodes) {
DiscoveryNode masterDiscoNode = null;
for (String node : nodes) {
ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(3));
if (masterDiscoNode == null) {
masterDiscoNode = state.nodes().masterNode();
} else {
assertThat(state.nodes().masterNode(), equalTo(masterDiscoNode));
}
}
assert masterDiscoNode != null;
return masterDiscoNode;
}
private void ensureStableCluster(int nodeCount) {
ensureStableCluster(nodeCount, TimeValue.timeValueSeconds(30), null);
}
@ -631,4 +582,8 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat(clusterHealthResponse.isTimedOut(), is(false));
}
private ClusterState getNodeClusterState(String node) {
return client(node).admin().cluster().prepareState().setLocal(true).get().getState();
}
}

View File

@ -222,7 +222,7 @@ public final class InternalTestCluster extends TestCluster {
this.numSharedClientNodes = numClientNodes;
}
}
assert this.numSharedClientNodes >=0;
assert this.numSharedClientNodes >= 0;
this.enableRandomBenchNodes = enableRandomBenchNodes;
@ -247,7 +247,7 @@ public final class InternalTestCluster extends TestCluster {
if (numOfDataPaths > 0) {
StringBuilder dataPath = new StringBuilder();
for (int i = 0; i < numOfDataPaths; i++) {
dataPath.append(new File("data/d"+i).getAbsolutePath()).append(',');
dataPath.append(new File("data/d" + i).getAbsolutePath()).append(',');
}
builder.put("path.data", dataPath.toString());
}
@ -270,7 +270,7 @@ public final class InternalTestCluster extends TestCluster {
public static String nodeMode() {
Builder builder = ImmutableSettings.builder();
if (Strings.isEmpty(System.getProperty("es.node.mode"))&& Strings.isEmpty(System.getProperty("es.node.local"))) {
if (Strings.isEmpty(System.getProperty("es.node.mode")) && Strings.isEmpty(System.getProperty("es.node.local"))) {
return "local"; // default if nothing is specified
}
if (Strings.hasLength(System.getProperty("es.node.mode"))) {
@ -327,7 +327,7 @@ public final class InternalTestCluster extends TestCluster {
//.put("index.store.type", random.nextInt(10) == 0 ? MockRamIndexStoreModule.class.getName() : MockFSIndexStoreModule.class.getName())
// decrease the routing schedule so new nodes will be added quickly - some random value between 30 and 80 ms
.put("cluster.routing.schedule", (30 + random.nextInt(50)) + "ms")
// default to non gateway
// default to non gateway
.put("gateway.type", "none")
.put(SETTING_CLUSTER_NODE_SEED, seed);
if (ENABLE_MOCK_MODULES && usually(random)) {
@ -351,7 +351,7 @@ public final class InternalTestCluster extends TestCluster {
builder.put(SearchService.KEEPALIVE_INTERVAL_KEY, TimeValue.timeValueSeconds(10 + random.nextInt(5 * 60)));
}
if (random.nextBoolean()) { // sometimes set a
builder.put(SearchService.DEFAUTL_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5*60)));
builder.put(SearchService.DEFAUTL_KEEPALIVE_KEY, TimeValue.timeValueSeconds(100 + random.nextInt(5 * 60)));
}
if (random.nextBoolean()) {
// change threadpool types to make sure we don't have components that rely on the type of thread pools
@ -782,7 +782,6 @@ public final class InternalTestCluster extends TestCluster {
public static final String TRANSPORT_CLIENT_PREFIX = "transport_client_";
static class TransportClientFactory {
private static TransportClientFactory NO_SNIFF_CLIENT_FACTORY = new TransportClientFactory(false, ImmutableSettings.EMPTY);
private static TransportClientFactory SNIFF_CLIENT_FACTORY = new TransportClientFactory(true, ImmutableSettings.EMPTY);
@ -1229,7 +1228,10 @@ public final class InternalTestCluster extends TestCluster {
}
private String getMasterName() {
/**
* get the name of the current master node
*/
public String getMasterName() {
try {
ClusterState state = client().admin().cluster().prepareState().execute().actionGet().getState();
return state.nodes().masterNode().name();

View File

@ -73,7 +73,7 @@ public abstract class NetworkPartition implements ServiceDisruptionScheme {
return ImmutableList.copyOf(nodesSideTwo);
}
public List<String> getMjaoritySide() {
public List<String> getMajoritySide() {
if (nodesSideOne.size() >= nodesSideTwo.size()) {
return getNodesSideOne();
} else {