From 43fae91ab918f199160790382130532c1a9015ef Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 19 Aug 2015 10:01:38 +0200 Subject: [PATCH] Test: ClusterDiscoveryConfiguration.UnicastZen should allow for port ranges The cluster configuration allows to setup a cluster for use with unicast discovery. This means that nodes have to have pre-calculated known addresses which can be used to poplulate the unicast hosts setting. Despite of repeated attempts to select unused ports, we still see test failures where the node can not bind to it's assigned port due to it already being in use (most on CentOS). This commit changes it to allow each node to have a pre-set mutual exclusive range of ports and add all those ports to the unicast hosts list. That's OK because we know the node will only bind to one of those. --- .../ClusterDiscoveryConfiguration.java | 83 +++++++++---------- 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java b/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java index a170cd128be..79f51594daf 100644 --- a/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java +++ b/core/src/test/java/org/elasticsearch/test/discovery/ClusterDiscoveryConfiguration.java @@ -30,6 +30,7 @@ import org.elasticsearch.transport.local.LocalTransport; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; @@ -62,8 +63,12 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { // this variable is incremented on each bind attempt and will maintain the next port that should be tried private static int nextPort = calcBasePort(); - private final int[] unicastHostOrdinals; - private final int[] unicastHostPorts; + // since we run multiple test iterations, we need some flexibility in the choice of ports a node can have (port may + // stay in use by previous iterations on some OSes - read CentOs). This controls the amount of ports each node + // is assigned. All ports in range will be added to the unicast hosts, which is OK because we know only one will be used. + private static final int NUM_PORTS_PER_NODE = 3; + + private final String[] unicastHosts; private final boolean localMode; public UnicastZen(int numOfNodes) { @@ -80,6 +85,7 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) { super(numOfNodes, extraSettings); + int[] unicastHostOrdinals; if (numOfUnicastHosts == numOfNodes) { unicastHostOrdinals = new int[numOfNodes]; for (int i = 0; i < numOfNodes; i++) { @@ -93,8 +99,7 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { unicastHostOrdinals = Ints.toArray(ordinals); } this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local"); - this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes); - assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length; + this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode); } public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) { @@ -103,73 +108,59 @@ public class ClusterDiscoveryConfiguration extends SettingsSource { public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) { super(numOfNodes, extraSettings); - this.unicastHostOrdinals = unicastHostOrdinals; this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local"); - this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes); - assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length; + this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode); } private static int calcBasePort() { return 30000 + InternalTestCluster.BASE_PORT; } + private static String[] buildUnicastHostSetting(int[] unicastHostOrdinals, boolean localMode) { + ArrayList unicastHosts = new ArrayList<>(); + for (int i = 0; i < unicastHostOrdinals.length; i++) { + final int hostOrdinal = unicastHostOrdinals[i]; + if (localMode) { + unicastHosts.add("node_" + hostOrdinal); + } else { + // we need to pin the node port & host so we'd know where to point things + final int[] ports = nodePorts(hostOrdinal); + for (int port : ports) { + unicastHosts.add("localhost:" + port); + } + } + } + return unicastHosts.toArray(new String[unicastHosts.size()]); + } + @Override public Settings node(int nodeOrdinal) { Settings.Builder builder = Settings.builder() .put("discovery.zen.ping.multicast.enabled", false); - String[] unicastHosts = new String[unicastHostOrdinals.length]; if (localMode) { builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal); - for (int i = 0; i < unicastHosts.length; i++) { - unicastHosts[i] = "node_" + unicastHostOrdinals[i]; - } - } else if (nodeOrdinal >= unicastHostPorts.length) { - throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]"); } else { // we need to pin the node port & host so we'd know where to point things - builder.put("transport.tcp.port", unicastHostPorts[nodeOrdinal]); - builder.put("transport.host", "localhost"); - for (int i = 0; i < unicastHostOrdinals.length; i++) { - unicastHosts[i] = "localhost:" + (unicastHostPorts[unicastHostOrdinals[i]]); + String ports = ""; + for (int port : nodePorts(nodeOrdinal)) { + ports += "," + port; } + builder.put("transport.tcp.port", ports.substring(1)); + builder.put("transport.host", "localhost"); } builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts); return builder.put(super.node(nodeOrdinal)).build(); } - protected synchronized static int[] unicastHostPorts(int numHosts) { - int[] unicastHostPorts = new int[numHosts]; + protected static int[] nodePorts(int nodeOridnal) { + int[] unicastHostPorts = new int[NUM_PORTS_PER_NODE]; - final int basePort = calcBasePort(); - final int maxPort = basePort + InternalTestCluster.PORTS_PER_JVM; - int tries = 0; + final int basePort = calcBasePort() + nodeOridnal * NUM_PORTS_PER_NODE; for (int i = 0; i < unicastHostPorts.length; i++) { - boolean foundPortInRange = false; - while (tries < InternalTestCluster.PORTS_PER_JVM && !foundPortInRange) { - try (ServerSocket serverSocket = new ServerSocket()) { - // Set SO_REUSEADDR as we may bind here and not be able to reuse the address immediately without it. - serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress()); - serverSocket.bind(new InetSocketAddress("127.0.0.1", nextPort)); - // bind was a success - foundPortInRange = true; - unicastHostPorts[i] = nextPort; - } catch (IOException e) { - // Do nothing - } - - nextPort++; - if (nextPort >= maxPort) { - // Roll back to the beginning of the range and do not go into another JVM's port range - nextPort = basePort; - } - tries++; - } - - if (!foundPortInRange) { - throw new ElasticsearchException("could not find enough open ports in range [" + basePort + "-" + maxPort + "]. required [" + unicastHostPorts.length + "] ports"); - } + unicastHostPorts[i] = basePort + i; } + return unicastHostPorts; } }