From c4c0270c5223e63834a6eca8f5bd61b1bdcc175d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 8 Jul 2014 19:29:19 +0200 Subject: [PATCH] [Tests] Enhance ZenUnicastDiscoveryTest This started out as a simple correction to a missing setting problem, but go bigger into more general work on the ZenUnicastDiscoveryTets suite. It now works with both network and local mode. I also merge the different ZenUnicast test suites into a single place. Closes #6835 --- .../transport/local/LocalTransport.java | 13 ++- .../discovery/ZenUnicastDiscoveryTests.java | 89 +++++++++++++++---- ...icastDiscoveryTestsMinimumMasterNodes.java | 68 -------------- ...ZenUnicastDiscoveryTestsSpecificNodes.java | 76 ---------------- 4 files changed, 81 insertions(+), 165 deletions(-) delete mode 100644 src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsMinimumMasterNodes.java delete mode 100644 src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsSpecificNodes.java diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index bdf1e8b70bb..b9ee0525e29 100644 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -58,6 +58,8 @@ public class LocalTransport extends AbstractLifecycleComponent implem private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); private final ConcurrentMap connectedNodes = newConcurrentMap(); + public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local_address"; + @Inject public LocalTransport(Settings settings, ThreadPool threadPool, Version version) { super(settings); @@ -77,8 +79,15 @@ public class LocalTransport extends AbstractLifecycleComponent implem @Override protected void doStart() throws ElasticsearchException { - localAddress = new LocalTransportAddress(Long.toString(transportAddressIdGenerator.incrementAndGet())); - transports.put(localAddress, this); + String address = settings.get(TRANSPORT_LOCAL_ADDRESS); + if (address == null) { + address = Long.toString(transportAddressIdGenerator.incrementAndGet()); + } + localAddress = new LocalTransportAddress(address); + LocalTransport previous = transports.put(localAddress, this); + if (previous != null) { + throw new ElasticsearchException("local address [" + address + "] is already bound"); + } boundAddress = new BoundTransportAddress(localAddress, localAddress); } diff --git a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java index f5bc259aff8..5f3f6cf978e 100644 --- a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java +++ b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.discovery; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.ImmutableSettings; @@ -27,34 +26,86 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import org.elasticsearch.transport.local.LocalTransport; +import org.junit.Before; import org.junit.Test; +import java.util.List; +import java.util.concurrent.ExecutionException; + import static org.hamcrest.Matchers.equalTo; -@ClusterScope(scope=Scope.TEST, numDataNodes =2) +@ClusterScope(scope = Scope.TEST, numDataNodes = 0) public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest { + private static int currentNumNodes = -1; + + static int currentBaseHttpPort = -1; + static int currentNumOfUnicastHosts = -1; + + @Before + public void setUP() throws Exception { + ElasticsearchIntegrationTest.beforeClass(); + currentNumNodes = randomIntBetween(3, 5); + currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes); + currentBaseHttpPort = 25000 + randomInt(100); + } + @Override protected Settings nodeSettings(int nodeOrdinal) { - return ImmutableSettings.settingsBuilder() + ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder() + .put("discovery.type", "zen") .put("discovery.zen.ping.multicast.enabled", false) - .put("discovery.zen.ping.unicast.hosts", "localhost") - .put("transport.tcp.port", "25300-25400") // Need to use custom tcp port range otherwise we collide with the shared cluster - .put(super.nodeSettings(nodeOrdinal)).build(); - } - - @Test - public void testUnicastDiscovery() { - for (Client client : clients()) { - ClusterState state = client.admin().cluster().prepareState().execute().actionGet().getState(); - //client nodes might be added randomly - int dataNodes = 0; - for (DiscoveryNode discoveryNode : state.nodes()) { - if (discoveryNode.isDataNode()) { - dataNodes++; - } + .put("http.enabled", false) // just to make test quicker + .put(super.nodeSettings(nodeOrdinal)); + + String[] unicastHosts = new String[currentNumOfUnicastHosts]; + if (internalCluster().getDefaultSettings().get("node.mode").equals("local")) { + builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "unicast_test_" + nodeOrdinal); + for (int i = 0; i < unicastHosts.length; i++) { + unicastHosts[i] = "unicast_test_" + i; + } + } else { + // we need to pin the node ports so we'd know where to point things + builder.put("transport.tcp.port", currentBaseHttpPort + nodeOrdinal); + for (int i = 0; i < unicastHosts.length; i++) { + unicastHosts[i] = "localhost:" + (currentBaseHttpPort + i); + } + } + builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts); + return builder.build(); + } + + @Test + public void testNormalClusterForming() throws ExecutionException, InterruptedException { + internalCluster().startNodesAsync(currentNumNodes).get(); + + if (client().admin().cluster().prepareHealth().setWaitForNodes("" + currentNumNodes).get().isTimedOut()) { + logger.info("cluster forming timed out, cluster state:\n{}", client().admin().cluster().prepareState().get().getState().prettyPrint()); + fail("timed out waiting for cluster to form with [" + currentNumNodes + "] nodes"); + } + } + + @Test + // Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this + // test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=N + // can't be satisfied. + public void testMinimumMasterNodes() throws Exception { + final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build(); + + List nodes = internalCluster().startNodesAsync(currentNumNodes, settings).get(); + + ensureGreen(); + + DiscoveryNode masterDiscoNode = null; + for (String node : nodes) { + ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); + assertThat(state.nodes().size(), equalTo(currentNumNodes)); + if (masterDiscoNode == null) { + masterDiscoNode = state.nodes().masterNode(); + } else { + assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true)); } - assertThat(dataNodes, equalTo(2)); } } } diff --git a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsMinimumMasterNodes.java b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsMinimumMasterNodes.java deleted file mode 100644 index 2b6da15d263..00000000000 --- a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsMinimumMasterNodes.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery; - -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import org.junit.Test; - -import java.util.List; - -import static org.hamcrest.Matchers.equalTo; - -@ClusterScope(scope=Scope.TEST, numDataNodes =0) -public class ZenUnicastDiscoveryTestsMinimumMasterNodes extends ElasticsearchIntegrationTest { - - @Test - // Without the 'include temporalResponses responses to nodesToConnect' improvement in UnicastZenPing#sendPings this - // test fails, because 2 nodes elect themselves as master and the health request times out b/c waiting_for_nodes=3 - // can't be satisfied. - public void testUnicastDiscovery() throws Exception { - final Settings settings = ImmutableSettings.settingsBuilder() - .put("discovery.zen.ping.multicast.enabled", false) - .put("discovery.zen.minimum_master_nodes", 2) - .put("discovery.zen.ping.unicast.hosts", "localhost") - .put("transport.tcp.port", "25400-25500") // Need to use custom tcp port range otherwise we collide with the shared cluster - .build(); - - List nodes = internalCluster().startNodesAsync(3, settings).get(); - - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - - DiscoveryNode masterDiscoNode = null; - for (String node : nodes.toArray(new String[3])) { - ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.nodes().size(), equalTo(3)); - if (masterDiscoNode == null) { - masterDiscoNode = state.nodes().masterNode(); - } else { - assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true)); - } - } - } -} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsSpecificNodes.java b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsSpecificNodes.java deleted file mode 100644 index 394200843a8..00000000000 --- a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTestsSpecificNodes.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.discovery; - -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.common.Priority; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.junit.annotations.TestLogging; -import org.junit.Test; - -import java.util.List; - -import static org.apache.lucene.util.LuceneTestCase.Slow; -import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import static org.hamcrest.Matchers.equalTo; - -/** - */ -@Slow -@ClusterScope(scope = Scope.TEST, numDataNodes = 0) -public class ZenUnicastDiscoveryTestsSpecificNodes extends ElasticsearchIntegrationTest { - - @Test - @TestLogging("discovery.zen:TRACE") - // The bug zen unicast ping override bug, may rarely manifest itself, it is very timing dependant. - // Without the fix in UnicastZenPing, this test fails roughly 1 out of 10 runs from the command line. - public void testMasterElectionNotMissed() throws Exception { - final Settings settings = settingsBuilder() - // Failure only manifests if multicast ping is disabled! - .put("discovery.zen.ping.multicast.ping.enabled", false) - .put("discovery.zen.minimum_master_nodes", 2) - // Can't use this, b/c at the moment all node will only ping localhost:9300 -// .put("discovery.zen.ping.unicast.hosts", "localhost") - .put("discovery.zen.ping.unicast.hosts", "localhost:15300,localhost:15301,localhost:15302") - .put("transport.tcp.port", "15300-15400") - .build(); - List nodes = internalCluster().startNodesAsync(3, settings).get(); - - ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3").execute().actionGet(); - assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); - - DiscoveryNode masterDiscoNode = null; - for (String node : nodes.toArray(new String[3])) { - ClusterState state = internalCluster().client(node).admin().cluster().prepareState().setLocal(true).execute().actionGet().getState(); - assertThat(state.nodes().size(), equalTo(3)); - if (masterDiscoNode == null) { - masterDiscoNode = state.nodes().masterNode(); - } else { - assertThat(masterDiscoNode.equals(state.nodes().masterNode()), equalTo(true)); - } - } - } - -}