[Tests] Introduced ClusterDiscoveryConfiguration

Closes #6890
This commit is contained in:
Boaz Leskes 2014-07-13 09:42:36 +02:00
parent ccabb4aa20
commit ea2783787c
5 changed files with 199 additions and 64 deletions

View File

@ -0,0 +1,136 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.google.common.primitives.Ints;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.SettingsSource;
import org.elasticsearch.transport.local.LocalTransport;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
public class ClusterDiscoveryConfiguration extends SettingsSource {
public static Settings DEFAULT_SETTINGS = ImmutableSettings.settingsBuilder()
.put("gateway.type", "local")
.put("discovery.type", "zen")
.build();
final int numOfNodes;
final Settings baseSettings;
public ClusterDiscoveryConfiguration(int numOfNodes) {
this(numOfNodes, ImmutableSettings.EMPTY);
}
public ClusterDiscoveryConfiguration(int numOfNodes, Settings extraSettings) {
this.numOfNodes = numOfNodes;
this.baseSettings = ImmutableSettings.builder().put(DEFAULT_SETTINGS).put(extraSettings).build();
}
@Override
public Settings node(int nodeOrdinal) {
return baseSettings;
}
@Override
public Settings transportClient() {
return baseSettings;
}
public static class UnicastZen extends ClusterDiscoveryConfiguration {
private final static AtomicInteger portRangeCounter = new AtomicInteger();
private final int[] unicastHostOrdinals;
private final int basePort;
public UnicastZen(int numOfNodes) {
this(numOfNodes, numOfNodes);
}
public UnicastZen(int numOfNodes, int numOfUnicastHosts) {
this(numOfNodes, numOfUnicastHosts, ImmutableSettings.EMPTY);
}
public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) {
super(numOfNodes, extraSettings);
if (numOfUnicastHosts == numOfNodes) {
unicastHostOrdinals = new int[numOfNodes];
for (int i = 0; i < numOfNodes; i++) {
unicastHostOrdinals[i] = i;
}
} else {
Set<Integer> ordinals = new HashSet<>(numOfUnicastHosts);
while (ordinals.size() != numOfUnicastHosts) {
ordinals.add(RandomizedTest.randomInt(numOfNodes - 1));
}
unicastHostOrdinals = Ints.toArray(ordinals);
}
this.basePort = calcBasePort();
}
public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) {
this(numOfNodes, ImmutableSettings.EMPTY, unicastHostOrdinals);
}
public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) {
super(numOfNodes, extraSettings);
this.unicastHostOrdinals = unicastHostOrdinals;
this.basePort = calcBasePort();
}
private final static int calcBasePort() {
return 10000 +
1000 * (ElasticsearchIntegrationTest.CHILD_VM_ID.hashCode() % 60) + // up to 60 jvms
100 * portRangeCounter.incrementAndGet(); // up to 100 nodes
}
@Override
public Settings node(int nodeOrdinal) {
ImmutableSettings.Builder builder = ImmutableSettings.builder()
.put("discovery.zen.ping.multicast.enabled", false);
String[] unicastHosts = new String[unicastHostOrdinals.length];
if (InternalTestCluster.NODE_MODE.equals("local")) {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "node_" + unicastHostOrdinals[i];
}
} else {
// we need to pin the node port & host so we'd know where to point things
builder.put("transport.tcp.port", basePort + nodeOrdinal);
builder.put("transport.host", "localhost");
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "localhost:" + (basePort + unicastHostOrdinals[i]);
}
}
builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts);
return builder.put(super.node(nodeOrdinal)).build();
}
}
}

View File

@ -37,19 +37,18 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.*; import org.elasticsearch.test.disruption.*;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportModule;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -68,15 +67,18 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places. private static final TimeValue DISRUPTION_HEALING_OVERHEAD = TimeValue.timeValueSeconds(40); // we use 30s as timeout in many places.
private static final Settings nodeSettings = ImmutableSettings.settingsBuilder() private ClusterDiscoveryConfiguration discoveryConfig;
.put("gateway.type", "local")
.put("discovery.type", "zen") // <-- To override the local setting if set externally
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly @Override
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly protected Settings nodeSettings(int nodeOrdinal) {
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly return discoveryConfig.node(nodeOrdinal);
.put("discovery.zen.minimum_master_nodes", 2) }
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build(); @Before
public void clearConfig() {
discoveryConfig = null;
}
@Override @Override
protected int numberOfShards() { protected int numberOfShards() {
@ -88,6 +90,31 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
return 1; return 1;
} }
private List<String> startCluster(int numberOfNodes) throws ExecutionException, InterruptedException {
Settings settings = ImmutableSettings.builder()
// TODO: this is a temporary solution so that nodes will not base their reaction to a partition based on previous successful results
.put("discovery.zen.ping_timeout", "0.5s")
// end of temporary solution
.put("discovery.zen.fd.ping_timeout", "1s") // <-- for hitting simulated network failures quickly
.put("discovery.zen.fd.ping_retries", "1") // <-- for hitting simulated network failures quickly
.put(DiscoverySettings.PUBLISH_TIMEOUT, "1s") // <-- for hitting simulated network failures quickly
.put("http.enabled", false) // just to make test quicker
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, numberOfNodes / 2 + 1).build();
if (discoveryConfig == null) {
if (randomBoolean()) {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, numberOfNodes, settings);
} else {
discoveryConfig = new ClusterDiscoveryConfiguration(numberOfNodes, settings);
}
}
List<String> nodes = internalCluster().startNodesAsync(numberOfNodes).get();
ensureStableCluster(numberOfNodes);
return nodes;
}
/** /**
* Test that no split brain occurs under partial network partition. See https://github.com/elasticsearch/elasticsearch/issues/2488 * Test that no split brain occurs under partial network partition. See https://github.com/elasticsearch/elasticsearch/issues/2488
* *
@ -96,10 +123,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
@Test @Test
public void failWithMinimumMasterNodesConfigured() throws Exception { public void failWithMinimumMasterNodesConfigured() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); List<String> nodes = startCluster(3);
// Wait until 3 nodes are part of the cluster
ensureStableCluster(3);
// Figure out what is the elected master node // Figure out what is the elected master node
final String masterNode = internalCluster().getMasterName(); final String masterNode = internalCluster().getMasterName();
@ -154,9 +178,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
@Test @Test
@TestLogging(value = "cluster.service:TRACE,indices.recovery:TRACE") @TestLogging(value = "cluster.service:TRACE,indices.recovery:TRACE")
public void testVerifyApiBlocksDuringPartition() throws Exception { public void testVerifyApiBlocksDuringPartition() throws Exception {
internalCluster().startNodesAsync(3, nodeSettings).get(); startCluster(3);
// Wait until a 3 nodes are part of the cluster
ensureStableCluster(3);
// Makes sure that the get request can be executed on each node locally: // Makes sure that the get request can be executed on each node locally:
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
@ -276,8 +298,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
@Test @Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception { public void testIsolateMasterAndVerifyClusterStateConsensus() throws Exception {
final List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); final List<String> nodes = startCluster(3);
ensureStableCluster(3);
assertAcked(prepareCreate("test") assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder() .setSettings(ImmutableSettings.builder()
@ -340,8 +361,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
@LuceneTestCase.AwaitsFix(bugUrl = "needs some more work to stabilize") @LuceneTestCase.AwaitsFix(bugUrl = "needs some more work to stabilize")
@TestLogging("action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") @TestLogging("action.index:TRACE,action.get:TRACE,discovery:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testAckedIndexing() throws Exception { public void testAckedIndexing() throws Exception {
final List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); final List<String> nodes = startCluster(3);
ensureStableCluster(3);
assertAcked(prepareCreate("test") assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder() .setSettings(ImmutableSettings.builder()
@ -478,8 +498,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
@Test @Test
@TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE") @TestLogging("discovery.zen:TRACE,action:TRACE,cluster.service:TRACE,indices.recovery:TRACE,indices.cluster:TRACE")
public void testRejoinDocumentExistsInAllShardCopies() throws Exception { public void testRejoinDocumentExistsInAllShardCopies() throws Exception {
List<String> nodes = internalCluster().startNodesAsync(3, nodeSettings).get(); List<String> nodes = startCluster(3);
ensureStableCluster(3);
assertAcked(prepareCreate("test") assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder() .setSettings(ImmutableSettings.builder()

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -38,46 +37,24 @@ import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope = Scope.TEST, numDataNodes = 0) @ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest { public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
private static int currentNumNodes = -1; private ClusterDiscoveryConfiguration discoveryConfig;
static int currentBaseHttpPort = -1;
static int currentNumOfUnicastHosts = -1;
@Before
public void setUP() throws Exception {
ElasticsearchIntegrationTest.beforeClass();
currentNumNodes = randomIntBetween(3, 5);
currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes);
currentBaseHttpPort = 25000 + randomInt(100);
}
@Override @Override
protected Settings nodeSettings(int nodeOrdinal) { protected Settings nodeSettings(int nodeOrdinal) {
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() return discoveryConfig.node(nodeOrdinal);
.put("discovery.type", "zen") }
.put("discovery.zen.ping.multicast.enabled", false)
.put("http.enabled", false) // just to make test quicker
.put(super.nodeSettings(nodeOrdinal));
String[] unicastHosts = new String[currentNumOfUnicastHosts]; @Before
if (internalCluster().getDefaultSettings().get("node.mode").equals("local")) { public void clearConfig() {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "unicast_test_" + nodeOrdinal); discoveryConfig = null;
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "unicast_test_" + i;
}
} else {
// we need to pin the node ports so we'd know where to point things
builder.put("transport.tcp.port", currentBaseHttpPort + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "localhost:" + (currentBaseHttpPort + i);
}
}
builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts);
return builder.build();
} }
@Test @Test
public void testNormalClusterForming() throws ExecutionException, InterruptedException { public void testNormalClusterForming() throws ExecutionException, InterruptedException {
int currentNumNodes = randomIntBetween(3, 5);
int currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes);
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts);
internalCluster().startNodesAsync(currentNumNodes).get(); internalCluster().startNodesAsync(currentNumNodes).get();
if (client().admin().cluster().prepareHealth().setWaitForNodes("" + currentNumNodes).get().isTimedOut()) { if (client().admin().cluster().prepareHealth().setWaitForNodes("" + currentNumNodes).get().isTimedOut()) {
@ -91,9 +68,12 @@ public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
// test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=N // test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=N
// can't be satisfied. // can't be satisfied.
public void testMinimumMasterNodes() throws Exception { public void testMinimumMasterNodes() throws Exception {
int currentNumNodes = randomIntBetween(3, 5);
int currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes);
final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build(); final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build();
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings);
List<String> nodes = internalCluster().startNodesAsync(currentNumNodes, settings).get(); List<String> nodes = internalCluster().startNodesAsync(currentNumNodes).get();
ensureGreen(); ensureGreen();

View File

@ -107,8 +107,8 @@ import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy; import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
/** /**
* InternalTestCluster manages a set of JVM private nodes and allows convenient access to them. * InternalTestCluster manages a set of JVM private nodes and allows convenient access to them.
@ -155,7 +155,7 @@ public final class InternalTestCluster extends TestCluster {
static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true; static final boolean DEFAULT_ENABLE_RANDOM_BENCH_NODES = true;
static final String NODE_MODE = nodeMode(); public static final String NODE_MODE = nodeMode();
/* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */ /* sorted map to make traverse order reproducible, concurrent since we do checks on it not within a sync block */
private final NavigableMap<String, NodeAndClient> nodes = new TreeMap<>(); private final NavigableMap<String, NodeAndClient> nodes = new TreeMap<>();

View File

@ -20,7 +20,7 @@ package org.elasticsearch.test;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
abstract class SettingsSource { public abstract class SettingsSource {
public static final SettingsSource EMPTY = new SettingsSource() { public static final SettingsSource EMPTY = new SettingsSource() {
@Override @Override
@ -35,7 +35,7 @@ abstract class SettingsSource {
}; };
/** /**
* @return the settings for the node represented by the given ordinal, or {@code null} if there are no settings defined * @return the settings for the node represented by the given ordinal, or {@code null} if there are no settings defined
*/ */
public abstract Settings node(int nodeOrdinal); public abstract Settings node(int nodeOrdinal);