[Tests] Enhance ZenUnicastDiscoveryTest

This started out as a simple correction to a missing setting problem, but go bigger into more general work on the ZenUnicastDiscoveryTets suite. It now works with both network and local mode. I also merge the different ZenUnicast test suites into a single place.

Closes #6835
This commit is contained in:
Boaz Leskes 2014-07-08 19:29:19 +02:00
parent 6c78147f5f
commit c4c0270c52
4 changed files with 81 additions and 165 deletions

View File

@ -58,6 +58,8 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local_address";
@Inject
public LocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings);
@ -77,8 +79,15 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
@Override
protected void doStart() throws ElasticsearchException {
localAddress = new LocalTransportAddress(Long.toString(transportAddressIdGenerator.incrementAndGet()));
transports.put(localAddress, this);
String address = settings.get(TRANSPORT_LOCAL_ADDRESS);
if (address == null) {
address = Long.toString(transportAddressIdGenerator.incrementAndGet());
}
localAddress = new LocalTransportAddress(address);
LocalTransport previous = transports.put(localAddress, this);
if (previous != null) {
throw new ElasticsearchException("local address [" + address + "] is already bound");
}
boundAddress = new BoundTransportAddress(localAddress, localAddress);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.discovery;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -27,34 +26,86 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.elasticsearch.transport.local.LocalTransport;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope=Scope.TEST, numDataNodes =2)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
private static int currentNumNodes = -1;
static int currentBaseHttpPort = -1;
static int currentNumOfUnicastHosts = -1;
@Before
public void setUP() throws Exception {
ElasticsearchIntegrationTest.beforeClass();
currentNumNodes = randomIntBetween(3, 5);
currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes);
currentBaseHttpPort = 25000 + randomInt(100);
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.settingsBuilder()
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder()
.put("discovery.type", "zen")
.put("discovery.zen.ping.multicast.enabled", false)
.put("discovery.zen.ping.unicast.hosts", "localhost")
.put("transport.tcp.port", "25300-25400") // Need to use custom tcp port range otherwise we collide with the shared cluster
.put(super.nodeSettings(nodeOrdinal)).build();
}
@Test
public void testUnicastDiscovery() {
for (Client client : clients()) {
ClusterState state = client.admin().cluster().prepareState().execute().actionGet().getState();
//client nodes might be added randomly
int dataNodes = 0;
for (DiscoveryNode discoveryNode : state.nodes()) {
if (discoveryNode.isDataNode()) {
dataNodes++;
}
.put("http.enabled", false) // just to make test quicker
.put(super.nodeSettings(nodeOrdinal));
String[] unicastHosts = new String[currentNumOfUnicastHosts];
if (internalCluster().getDefaultSettings().get("node.mode").equals("local")) {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "unicast_test_" + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "unicast_test_" + i;
}
} else {
// we need to pin the node ports so we'd know where to point things
builder.put("transport.tcp.port", currentBaseHttpPort + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "localhost:" + (currentBaseHttpPort + i);
}
}
builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts);
return builder.build();
}
@Test
public void testNormalClusterForming() throws ExecutionException, InterruptedException {
internalCluster().startNodesAsync(currentNumNodes).get();
if (client().admin().cluster().prepareHealth().setWaitForNodes("" + currentNumNodes).get().isTimedOut()) {
logger.info("cluster forming timed out, cluster state:\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint());
fail("timed out waiting for cluster to form with [" + currentNumNodes + "] nodes");
}
}
@Test
// Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this
// test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=N
// can't be satisfied.
public void testMinimumMasterNodes() throws Exception {
final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build();
List<String> nodes = internalCluster().startNodesAsync(currentNumNodes, settings).get();
ensureGreen();
DiscoveryNode masterDiscoNode = null;
for (String node : nodes) {
ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(currentNumNodes));
if (masterDiscoNode == null) {
masterDiscoNode = state.nodes().masterNode();
} else {
assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true));
}
assertThat(dataNodes, equalTo(2));
}
}
}

View File

@ -1,68 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.junit.Test;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope=Scope.TEST, numDataNodes =0)
public class ZenUnicastDiscoveryTestsMinimumMasterNodes extends ElasticsearchIntegrationTest {
@Test
// Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this
// test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=3
// can't be satisfied.
public void testUnicastDiscovery() throws Exception {
final Settings settings = ImmutableSettings.settingsBuilder()
.put("discovery.zen.ping.multicast.enabled", false)
.put("discovery.zen.minimum_master_nodes", 2)
.put("discovery.zen.ping.unicast.hosts", "localhost")
.put("transport.tcp.port", "25400-25500") // Need to use custom tcp port range otherwise we collide with the shared cluster
.build();
List<String> nodes = internalCluster().startNodesAsync(3, settings).get();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
DiscoveryNode masterDiscoNode = null;
for (String node : nodes.toArray(new String[3])) {
ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(3));
if (masterDiscoNode == null) {
masterDiscoNode = state.nodes().masterNode();
} else {
assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true));
}
}
}
}

View File

@ -1,76 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.discovery;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test;
import java.util.List;
import static org.apache.lucene.util.LuceneTestCase.Slow;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Slow
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class ZenUnicastDiscoveryTestsSpecificNodes extends ElasticsearchIntegrationTest {
@Test
@TestLogging("discovery.zen:TRACE")
// The bug zen unicast ping override bug, may rarely manifest itself, it is very timing dependant.
// Without the fix in UnicastZenPing, this test fails roughly 1 out of 10 runs from the command line.
public void testMasterElectionNotMissed() throws Exception {
final Settings settings = settingsBuilder()
// Failure only manifests if multicast ping is disabled!
.put("discovery.zen.ping.multicast.ping.enabled", false)
.put("discovery.zen.minimum_master_nodes", 2)
// Can't use this, b/c at the moment all node will only ping localhost:9300
// .put("discovery.zen.ping.unicast.hosts", "localhost")
.put("discovery.zen.ping.unicast.hosts", "localhost:15300,localhost:15301,localhost:15302")
.put("transport.tcp.port", "15300-15400")
.build();
List<String> nodes = internalCluster().startNodesAsync(3, settings).get();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").execute().actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
DiscoveryNode masterDiscoNode = null;
for (String node : nodes.toArray(new String[3])) {
ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState();
assertThat(state.nodes().size(), equalTo(3));
if (masterDiscoNode == null) {
masterDiscoNode = state.nodes().masterNode();
} else {
assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true));
}
}
}
}