From efbda318d061867c098be5112561f8099227b9d4 Mon Sep 17 00:00:00 2001 From: jaymode Date: Wed, 25 Feb 2015 13:26:21 -0500 Subject: [PATCH] Tests: check node ports availability when using unicast discovery Some tests failures are seen when a node attempts to use a port that is already bound by some other process on the test machine. This commit adds a bind to test port availability and iterates over the port range until an available port is found. This reduces the likelihood of a test node failing to start up due to the port already being bound. --- .../DiscoveryWithServiceDisruptions.java | 4 +- .../discovery/ZenUnicastDiscoveryTests.java | 4 +- .../ClusterDiscoveryConfiguration.java | 94 +++++++++++++------ 3 files changed, 67 insertions(+), 35 deletions(-) diff --git a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java index 82c1c252a1d..25dca2a70be 100644 --- a/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java +++ b/src/test/java/org/elasticsearch/discovery/DiscoveryWithServiceDisruptions.java @@ -185,9 +185,9 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes if (discoveryConfig == null) { if (unicastHostsOrdinals == null) { - discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, Scope.TEST); + discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings); } else { - discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals, Scope.TEST); + discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals); } } } diff --git a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java index 5f846ce7b0c..f37f138c9e5 100644 --- a/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java +++ b/src/test/java/org/elasticsearch/discovery/ZenUnicastDiscoveryTests.java @@ -59,7 +59,7 @@ public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest { for (int i = 0; i < unicastHostOrdinals.length; i++) { unicastHostOrdinals[i] = i; } - discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, unicastHostOrdinals, Scope.TEST); + discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, unicastHostOrdinals); // start the unicast hosts internalCluster().startNodesAsync(unicastHostOrdinals.length).get(); @@ -81,7 +81,7 @@ public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest { int currentNumNodes = randomIntBetween(3, 5); int currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes); final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build(); - discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings, Scope.TEST); + discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings); List nodes = internalCluster().startNodesAsync(currentNumNodes).get(); diff --git a/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java b/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java index 5bfc6c1ad62..30063c56b8f 100644 --- a/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java +++ b/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java @@ -20,6 +20,7 @@ package org.elasticsearch.test.discovery; import com.carrotsearch.randomizedtesting.RandomizedTest; import com.google.common.primitives.Ints; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchIntegrationTest; @@ -27,9 +28,10 @@ import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.SettingsSource; import org.elasticsearch.transport.local.LocalTransport; +import java.io.IOException; +import java.net.ServerSocket; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; public class ClusterDiscoveryConfiguration extends SettingsSource { @@ -61,24 +63,26 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { public static class UnicastZen extends ClusterDiscoveryConfiguration { - private static final AtomicInteger portCounter = new AtomicInteger(); + // this variable is incremented on each bind attempt and will maintain the next port that should be tried + private static int nextPort = calcBasePort(); private final int[] unicastHostOrdinals; - private final int basePort; + private final int[] unicastHostPorts; + private final boolean localMode; - public UnicastZen(int numOfNodes, ElasticsearchIntegrationTest.Scope scope) { - this(numOfNodes, numOfNodes, scope); + public UnicastZen(int numOfNodes) { + this(numOfNodes, numOfNodes); } - public UnicastZen(int numOfNodes, Settings extraSettings, ElasticsearchIntegrationTest.Scope scope) { - this(numOfNodes, numOfNodes, extraSettings, scope); + public UnicastZen(int numOfNodes, Settings extraSettings) { + this(numOfNodes, numOfNodes, extraSettings); } - public UnicastZen(int numOfNodes, int numOfUnicastHosts, ElasticsearchIntegrationTest.Scope scope) { - this(numOfNodes, numOfUnicastHosts, ImmutableSettings.EMPTY, scope); + public UnicastZen(int numOfNodes, int numOfUnicastHosts) { + this(numOfNodes, numOfUnicastHosts, ImmutableSettings.EMPTY); } - public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings, ElasticsearchIntegrationTest.Scope scope) { + public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) { super(numOfNodes, extraSettings); if (numOfUnicastHosts == numOfNodes) { unicastHostOrdinals = new int[numOfNodes]; @@ -92,32 +96,27 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { } unicastHostOrdinals = Ints.toArray(ordinals); } - this.basePort = calcBasePort(scope); + this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local"); + this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes); + assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length; } - public UnicastZen(int numOfNodes, int[] unicastHostOrdinals, ElasticsearchIntegrationTest.Scope scope) { - this(numOfNodes, ImmutableSettings.EMPTY, unicastHostOrdinals, scope); + public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) { + this(numOfNodes, ImmutableSettings.EMPTY, unicastHostOrdinals); } - public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals, ElasticsearchIntegrationTest.Scope scope) { + public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) { super(numOfNodes, extraSettings); this.unicastHostOrdinals = unicastHostOrdinals; - this.basePort = calcBasePort(scope); + this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local"); + this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes); + assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length; } - private static int calcBasePort(ElasticsearchIntegrationTest.Scope scope) { + private static int calcBasePort() { // note that this has properly co-exist with the port logic at InternalTestCluster's constructor return 30000 + - 1000 * (ElasticsearchIntegrationTest.CHILD_JVM_ID) + // up to 30 jvms - //up to 100 nodes per cluster - 100 * scopeId(scope); - } - - private static int scopeId(ElasticsearchIntegrationTest.Scope scope) { - //ports can be reused as suite or test clusters are never run concurrently - //we don't reuse the same port immediately though but leave some time to make sure ports are freed - //prevent conflicts between jvms by never going above 9 - return portCounter.incrementAndGet() % 9; + 1000 * (ElasticsearchIntegrationTest.CHILD_JVM_ID); } @Override @@ -126,22 +125,55 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { .put("discovery.zen.ping.multicast.enabled", false); String[] unicastHosts = new String[unicastHostOrdinals.length]; - String mode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE); - if (mode.equals("local")) { + if (localMode) { builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal); for (int i = 0; i < unicastHosts.length; i++) { unicastHosts[i] = "node_" + unicastHostOrdinals[i]; } + } else if (nodeOrdinal >= unicastHostPorts.length) { + throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]"); } else { // we need to pin the node port & host so we'd know where to point things - builder.put("transport.tcp.port", basePort + nodeOrdinal); + builder.put("transport.tcp.port", unicastHostPorts[nodeOrdinal]); builder.put("transport.host", "localhost"); - for (int i = 0; i < unicastHosts.length; i++) { - unicastHosts[i] = "localhost:" + (basePort + unicastHostOrdinals[i]); + for (int i = 0; i < unicastHostOrdinals.length; i++) { + unicastHosts[i] = "localhost:" + (unicastHostPorts[unicastHostOrdinals[i]]); } } builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts); return builder.put(super.node(nodeOrdinal)).build(); } + + protected synchronized static int[] unicastHostPorts(int numHosts) { + int[] unicastHostPorts = new int[numHosts]; + + final int basePort = calcBasePort(); + final int maxPort = basePort + 1000; + int tries = 0; + for (int i = 0; i < unicastHostPorts.length; i++) { + boolean foundPortInRange = false; + while (tries < 1000 && !foundPortInRange) { + try (ServerSocket socket = new ServerSocket(nextPort)) { + // bind was a success + foundPortInRange = true; + unicastHostPorts[i] = nextPort; + } catch (IOException e) { + // Do nothing + } + + nextPort++; + if (nextPort >= maxPort) { + // Roll back to the beginning of the range and do not go into another JVM's port range + nextPort = basePort; + } + tries++; + } + + if (!foundPortInRange) { + throw new ElasticsearchException("could not find enough open ports in range [" + basePort + "-" + maxPort + "]. required [" + unicastHostPorts.length + "] ports"); + } + } + return unicastHostPorts; + } } }