Test: ClusterDiscoveryConfiguration.UnicastZen should allow for port ranges
The cluster configuration allows to setup a cluster for use with unicast discovery. This means that nodes have to have pre-calculated known addresses which can be used to poplulate the unicast hosts setting. Despite of repeated attempts to select unused ports, we still see test failures where the node can not bind to it's assigned port due to it already being in use (most on CentOS). This commit changes it to allow each node to have a pre-set mutual exclusive range of ports and add all those ports to the unicast hosts list. That's OK because we know the node will only bind to one of those.
This commit is contained in:
parent
ef236e3f0f
commit
43fae91ab9
|
@ -30,6 +30,7 @@ import org.elasticsearch.transport.local.LocalTransport;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.ServerSocket;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -62,8 +63,12 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
|
|||
// this variable is incremented on each bind attempt and will maintain the next port that should be tried
|
||||
private static int nextPort = calcBasePort();
|
||||
|
||||
private final int[] unicastHostOrdinals;
|
||||
private final int[] unicastHostPorts;
|
||||
// since we run multiple test iterations, we need some flexibility in the choice of ports a node can have (port may
|
||||
// stay in use by previous iterations on some OSes - read CentOs). This controls the amount of ports each node
|
||||
// is assigned. All ports in range will be added to the unicast hosts, which is OK because we know only one will be used.
|
||||
private static final int NUM_PORTS_PER_NODE = 3;
|
||||
|
||||
private final String[] unicastHosts;
|
||||
private final boolean localMode;
|
||||
|
||||
public UnicastZen(int numOfNodes) {
|
||||
|
@ -80,6 +85,7 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
|
|||
|
||||
public UnicastZen(int numOfNodes, int numOfUnicastHosts, Settings extraSettings) {
|
||||
super(numOfNodes, extraSettings);
|
||||
int[] unicastHostOrdinals;
|
||||
if (numOfUnicastHosts == numOfNodes) {
|
||||
unicastHostOrdinals = new int[numOfNodes];
|
||||
for (int i = 0; i < numOfNodes; i++) {
|
||||
|
@ -93,8 +99,7 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
|
|||
unicastHostOrdinals = Ints.toArray(ordinals);
|
||||
}
|
||||
this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local");
|
||||
this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes);
|
||||
assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length;
|
||||
this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode);
|
||||
}
|
||||
|
||||
public UnicastZen(int numOfNodes, int[] unicastHostOrdinals) {
|
||||
|
@ -103,73 +108,59 @@ public class ClusterDiscoveryConfiguration extends SettingsSource {
|
|||
|
||||
public UnicastZen(int numOfNodes, Settings extraSettings, int[] unicastHostOrdinals) {
|
||||
super(numOfNodes, extraSettings);
|
||||
this.unicastHostOrdinals = unicastHostOrdinals;
|
||||
this.localMode = nodeSettings.get("node.mode", InternalTestCluster.NODE_MODE).equals("local");
|
||||
this.unicastHostPorts = localMode ? new int[0] : unicastHostPorts(numOfNodes);
|
||||
assert localMode || unicastHostOrdinals.length <= unicastHostPorts.length;
|
||||
this.unicastHosts = buildUnicastHostSetting(unicastHostOrdinals, localMode);
|
||||
}
|
||||
|
||||
private static int calcBasePort() {
|
||||
return 30000 + InternalTestCluster.BASE_PORT;
|
||||
}
|
||||
|
||||
private static String[] buildUnicastHostSetting(int[] unicastHostOrdinals, boolean localMode) {
|
||||
ArrayList<String> unicastHosts = new ArrayList<>();
|
||||
for (int i = 0; i < unicastHostOrdinals.length; i++) {
|
||||
final int hostOrdinal = unicastHostOrdinals[i];
|
||||
if (localMode) {
|
||||
unicastHosts.add("node_" + hostOrdinal);
|
||||
} else {
|
||||
// we need to pin the node port & host so we'd know where to point things
|
||||
final int[] ports = nodePorts(hostOrdinal);
|
||||
for (int port : ports) {
|
||||
unicastHosts.add("localhost:" + port);
|
||||
}
|
||||
}
|
||||
}
|
||||
return unicastHosts.toArray(new String[unicastHosts.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings node(int nodeOrdinal) {
|
||||
Settings.Builder builder = Settings.builder()
|
||||
.put("discovery.zen.ping.multicast.enabled", false);
|
||||
|
||||
String[] unicastHosts = new String[unicastHostOrdinals.length];
|
||||
if (localMode) {
|
||||
builder.put(LocalTransport.TRANSPORT_LOCAL_ADDRESS, "node_" + nodeOrdinal);
|
||||
for (int i = 0; i < unicastHosts.length; i++) {
|
||||
unicastHosts[i] = "node_" + unicastHostOrdinals[i];
|
||||
}
|
||||
} else if (nodeOrdinal >= unicastHostPorts.length) {
|
||||
throw new ElasticsearchException("nodeOrdinal [" + nodeOrdinal + "] is greater than the number unicast ports [" + unicastHostPorts.length + "]");
|
||||
} else {
|
||||
// we need to pin the node port & host so we'd know where to point things
|
||||
builder.put("transport.tcp.port", unicastHostPorts[nodeOrdinal]);
|
||||
builder.put("transport.host", "localhost");
|
||||
for (int i = 0; i < unicastHostOrdinals.length; i++) {
|
||||
unicastHosts[i] = "localhost:" + (unicastHostPorts[unicastHostOrdinals[i]]);
|
||||
String ports = "";
|
||||
for (int port : nodePorts(nodeOrdinal)) {
|
||||
ports += "," + port;
|
||||
}
|
||||
builder.put("transport.tcp.port", ports.substring(1));
|
||||
builder.put("transport.host", "localhost");
|
||||
}
|
||||
builder.putArray("discovery.zen.ping.unicast.hosts", unicastHosts);
|
||||
return builder.put(super.node(nodeOrdinal)).build();
|
||||
}
|
||||
|
||||
protected synchronized static int[] unicastHostPorts(int numHosts) {
|
||||
int[] unicastHostPorts = new int[numHosts];
|
||||
protected static int[] nodePorts(int nodeOridnal) {
|
||||
int[] unicastHostPorts = new int[NUM_PORTS_PER_NODE];
|
||||
|
||||
final int basePort = calcBasePort();
|
||||
final int maxPort = basePort + InternalTestCluster.PORTS_PER_JVM;
|
||||
int tries = 0;
|
||||
final int basePort = calcBasePort() + nodeOridnal * NUM_PORTS_PER_NODE;
|
||||
for (int i = 0; i < unicastHostPorts.length; i++) {
|
||||
boolean foundPortInRange = false;
|
||||
while (tries < InternalTestCluster.PORTS_PER_JVM && !foundPortInRange) {
|
||||
try (ServerSocket serverSocket = new ServerSocket()) {
|
||||
// Set SO_REUSEADDR as we may bind here and not be able to reuse the address immediately without it.
|
||||
serverSocket.setReuseAddress(NetworkUtils.defaultReuseAddress());
|
||||
serverSocket.bind(new InetSocketAddress("127.0.0.1", nextPort));
|
||||
// bind was a success
|
||||
foundPortInRange = true;
|
||||
unicastHostPorts[i] = nextPort;
|
||||
} catch (IOException e) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
nextPort++;
|
||||
if (nextPort >= maxPort) {
|
||||
// Roll back to the beginning of the range and do not go into another JVM's port range
|
||||
nextPort = basePort;
|
||||
}
|
||||
tries++;
|
||||
}
|
||||
|
||||
if (!foundPortInRange) {
|
||||
throw new ElasticsearchException("could not find enough open ports in range [" + basePort + "-" + maxPort + "]. required [" + unicastHostPorts.length + "] ports");
|
||||
}
|
||||
unicastHostPorts[i] = basePort + i;
|
||||
}
|
||||
|
||||
return unicastHostPorts;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue