Tests: check node ports availability when using unicast discovery

Some tests failures are seen when a node attempts to use a port that is already bound
by some other process on the test machine. This commit adds a bind to test port availability
and iterates over the port range until an available port is found. This reduces the likelihood
of a test node failing to start up due to the port already being bound.
This commit is contained in:
jaymode 2015-02-25 13:26:21 -05:00
parent ce53e20351
commit efbda318d0
3 changed files with 67 additions and 35 deletions

View File

@ -185,9 +185,9 @@ public class DiscoveryWithServiceDisruptions extends ElasticsearchIntegrationTes
if (discoveryConfig == null) { if (discoveryConfig == null) {
if (unicastHostsOrdinals == null) { if (unicastHostsOrdinals == null) {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, Scope.TEST); discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings);
} else { } else {
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals, Scope.TEST); discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(numberOfNodes, nodeSettings, unicastHostsOrdinals);
} }
} }
} }

View File

@ -59,7 +59,7 @@ public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
for (int i = 0; i < unicastHostOrdinals.length; i++) { for (int i = 0; i < unicastHostOrdinals.length; i++) {
unicastHostOrdinals[i] = i; unicastHostOrdinals[i] = i;
} }
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, unicastHostOrdinals, Scope.TEST); discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, unicastHostOrdinals);
// start the unicast hosts // start the unicast hosts
internalCluster().startNodesAsync(unicastHostOrdinals.length).get(); internalCluster().startNodesAsync(unicastHostOrdinals.length).get();
@ -81,7 +81,7 @@ public class ZenUnicastDiscoveryTests extends ElasticsearchIntegrationTest {
int currentNumNodes = randomIntBetween(3, 5); int currentNumNodes = randomIntBetween(3, 5);
int currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes); int currentNumOfUnicastHosts = randomIntBetween(1, currentNumNodes);
final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build(); final Settings settings = ImmutableSettings.settingsBuilder().put("discovery.zen.minimum_master_nodes", currentNumNodes / 2 + 1).build();
discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings, Scope.TEST); discoveryConfig = new ClusterDiscoveryConfiguration.UnicastZen(currentNumNodes, currentNumOfUnicastHosts, settings);
List<String> nodes = internalCluster().startNodesAsync(currentNumNodes).get(); List<String> nodes = internalCluster().startNodesAsync(currentNumNodes).get();

View File

@ -20,6 +20,7 @@ package org.elasticsearch.test.discovery;
import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -27,9 +28,10 @@ import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.SettingsSource; import org.elasticsearch.test.SettingsSource;
import org.elasticsearch.transport.local.LocalTransport; import org.elasticsearch.transport.local.LocalTransport;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
public class ClusterDiscoveryConfiguration extends SettingsSource { public class ClusterDiscoveryConfiguration extends SettingsSource {
@ -61,24 +63,26 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
public static class UnicastZen extends ClusterDiscoveryConfiguration { public static class UnicastZen extends ClusterDiscoveryConfiguration {
private static final AtomicInteger portCounter = new AtomicInteger(); // this variable is incremented on each bind attempt and will maintain the next port that should be tried
private static int nextPort = calcBasePort();
private final int[] unicastHostOrdinals; private final int[] unicastHostOrdinals;
private final int basePort; private final int[] unicastHostPorts;
private final boolean localMode;
public UnicastZen(int numOfNodes, ElasticsearchIntegrationTest.Scope scope) { public UnicastZen(int numOfNodes) {
this(numOfNodes, numOfNodes, scope); this(numOfNodes, numOfNodes);
} }
public UnicastZen(int numOfNodes, Settings extraSettings, ElasticsearchIntegrationTest.Scope scope) { public UnicastZen(int numOfNodes, Settings extraSettings) {
this(numOfNodes, numOfNodes, extraSettings, scope); this(numOfNodes, numOfNodes, extraSettings);
} }
public UnicastZen(int numOfNodes, int numOfUnicastHosts, ElasticsearchIntegrationTest.Scope scope) { public UnicastZen(int numOfNodes, int numOfUnicastHosts) {
this(numOfNodes, numOfUnicastHosts, ImmutableSettings.EMPTY, scope); this(numOfNodes, numOfUnicastHosts, ImmutableSettings.EMPTY);
} }
public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings, ElasticsearchIntegrationTest.Scope scope) { public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) {
super(numOfNodes, extraSettings); super(numOfNodes, extraSettings);
if (numOfUnicastHosts == numOfNodes) { if (numOfUnicastHosts == numOfNodes) {
unicastHostOrdinals = new int[numOfNodes]; unicastHostOrdinals = new int[numOfNodes];
@ -92,32 +96,27 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
} }
unicastHostOrdinals = Ints.toArray(ordinals); unicastHostOrdinals = Ints.toArray(ordinals);
} }
this.basePort = calcBasePort(scope); this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local");
this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes);
assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length;
} }
public UnicastZen(int numOfNodes, int[] unicastHostOrdinals, ElasticsearchIntegrationTest.Scope scope) { public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) {
this(numOfNodes, ImmutableSettings.EMPTY, unicastHostOrdinals, scope); this(numOfNodes, ImmutableSettings.EMPTY, unicastHostOrdinals);
} }
public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals, ElasticsearchIntegrationTest.Scope scope) { public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) {
super(numOfNodes, extraSettings); super(numOfNodes, extraSettings);
this.unicastHostOrdinals = unicastHostOrdinals; this.unicastHostOrdinals = unicastHostOrdinals;
this.basePort = calcBasePort(scope); this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local");
this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes);
assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length;
} }
private static int calcBasePort(ElasticsearchIntegrationTest.Scope scope) { private static int calcBasePort() {
// note that this has properly co-exist with the port logic at InternalTestCluster's constructor // note that this has properly co-exist with the port logic at InternalTestCluster's constructor
return 30000 + return 30000 +
1000 * (ElasticsearchIntegrationTest.CHILD_JVM_ID) + // up to 30 jvms 1000 * (ElasticsearchIntegrationTest.CHILD_JVM_ID);
//up to 100 nodes per cluster
100 * scopeId(scope);
}
private static int scopeId(ElasticsearchIntegrationTest.Scope scope) {
//ports can be reused as suite or test clusters are never run concurrently
//we don't reuse the same port immediately though but leave some time to make sure ports are freed
//prevent conflicts between jvms by never going above 9
return portCounter.incrementAndGet() % 9;
} }
@Override @Override
@ -126,22 +125,55 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
.put("discovery.zen.ping.multicast.enabled", false); .put("discovery.zen.ping.multicast.enabled", false);
String[] unicastHosts = new String[unicastHostOrdinals.length]; String[] unicastHosts = new String[unicastHostOrdinals.length];
String mode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE); if (localMode) {
if (mode.equals("local")) {
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal); builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal);
for (int i = 0; i < unicastHosts.length; i++) { for (int i = 0; i < unicastHosts.length; i++) {
unicastHosts[i] = "node_" + unicastHostOrdinals[i]; unicastHosts[i] = "node_" + unicastHostOrdinals[i];
} }
} else if (nodeOrdinal >= unicastHostPorts.length) {
throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]");
} else { } else {
// we need to pin the node port & host so we'd know where to point things // we need to pin the node port & host so we'd know where to point things
builder.put("transport.tcp.port", basePort + nodeOrdinal); builder.put("transport.tcp.port", unicastHostPorts[nodeOrdinal]);
builder.put("transport.host", "localhost"); builder.put("transport.host", "localhost");
for (int i = 0; i < unicastHosts.length; i++) { for (int i = 0; i < unicastHostOrdinals.length; i++) {
unicastHosts[i] = "localhost:" + (basePort + unicastHostOrdinals[i]); unicastHosts[i] = "localhost:" + (unicastHostPorts[unicastHostOrdinals[i]]);
} }
} }
builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts); builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts);
return builder.put(super.node(nodeOrdinal)).build(); return builder.put(super.node(nodeOrdinal)).build();
} }
protected synchronized static int[] unicastHostPorts(int numHosts) {
int[] unicastHostPorts = new int[numHosts];
final int basePort = calcBasePort();
final int maxPort = basePort + 1000;
int tries = 0;
for (int i = 0; i < unicastHostPorts.length; i++) {
boolean foundPortInRange = false;
while (tries < 1000 && !foundPortInRange) {
try (ServerSocket socket = new ServerSocket(nextPort)) {
// bind was a success
foundPortInRange = true;
unicastHostPorts[i] = nextPort;
} catch (IOException e) {
// Do nothing
}
nextPort++;
if (nextPort >= maxPort) {
// Roll back to the beginning of the range and do not go into another JVM's port range
nextPort = basePort;
}
tries++;
}
if (!foundPortInRange) {
throw new ElasticsearchException("could not find enough open ports in range [" + basePort + "-" + maxPort + "]. required [" + unicastHostPorts.length + "] ports");
}
}
return unicastHostPorts;
}
} }
} }